From 4366c67b27cace82d23ba2d9e1d99b0749860de9 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Sat, 17 Mar 2018 18:50:08 -0700 Subject: [PATCH] NIFI-4927 - InfluxDB Query Processor This closes #2562 Signed-off-by: Mike Thomsen --- .../nifi-influxdb-processors/pom.xml | 5 + .../influxdb/AbstractInfluxDBProcessor.java | 15 +- .../influxdb/ExecuteInfluxDBQuery.java | 262 ++++++++++++++ .../nifi/processors/influxdb/PutInfluxDB.java | 2 + .../org.apache.nifi.processor.Processor | 1 + .../influxdb/AbstractITInfluxDB.java | 83 +++++ .../influxdb/ITExecuteInfluxDBQuery.java | 329 ++++++++++++++++++ .../processors/influxdb/ITPutInfluxDB.java | 55 +-- .../influxdb/TestExecutetInfluxDBQuery.java | 226 ++++++++++++ 9 files changed, 917 insertions(+), 61 deletions(-) create mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java create mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java create mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java create mode 100644 nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml index 75a6ef3be3..0f9d6e96ca 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml @@ -56,6 +56,11 @@ slf4j-simple test + + com.google.code.gson + gson + 2.7 + junit junit diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java index cedde21d28..40dffa9451 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java @@ -15,31 +15,27 @@ * limitations under the License. */ package org.apache.nifi.processors.influxdb; - import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; - import okhttp3.OkHttpClient; import okhttp3.OkHttpClient.Builder; /** * Abstract base class for InfluxDB processors */ -abstract class AbstractInfluxDBProcessor extends AbstractProcessor { +public abstract class AbstractInfluxDBProcessor extends AbstractProcessor { - protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() .name("influxdb-charset") .displayName("Character Set") .description("Specifies the character set of the document data.") @@ -96,7 +92,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .sensitive(true) .build(); - protected static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder() .name("influxdb-max-records-size") .displayName("Max size of records") .description("Maximum size of records allowed to be posted in one batch") @@ -107,7 +103,6 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { .build(); public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message"; - protected AtomicReference influxDB = new AtomicReference<>(); protected long maxRecordsSize; @@ -121,12 +116,11 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS); String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue(); - try { influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout)); } catch(Exception e) { getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e); - throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e); + throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e); } getLogger().info("InfluxDB connection created for host {}", new Object[] {influxDbUrl}); @@ -136,7 +130,6 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue(); } protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java new file mode 100644 index 0000000000..ddb09724ad --- /dev/null +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ + @WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), + @WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), + }) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + + public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + + public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() + .name("influxdb-query-result-time-unit") + .displayName("Query Result Time Units") + .description("The time unit of query results from the InfluxDB") + .defaultValue(TimeUnit.NANOSECONDS.name()) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) + .sensitive(false) + .build(); + + public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() + .name("influxdb-query") + .displayName("InfluxDB Query") + .description("The InfluxDB query to execute. " + + "Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise" + + " it is created from this property.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successful InfluxDB queries are routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Falied InfluxDB queries are routed to this relationship").build(); + + static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("Failed queries that are retryable exception are routed to this relationship").build(); + + private static final Set relationships; + private static final List propertyDescriptors; + protected Gson gson = new Gson(); + + static { + final Set tempRelationships = new HashSet<>(); + tempRelationships.add(REL_SUCCESS); + tempRelationships.add(REL_FAILURE); + tempRelationships.add(REL_RETRY); + relationships = Collections.unmodifiableSet(tempRelationships); + final List tempDescriptors = new ArrayList<>(); + tempDescriptors.add(DB_NAME); + tempDescriptors.add(INFLUX_DB_URL); + tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT); + tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT); + tempDescriptors.add(INFLUX_DB_QUERY); + tempDescriptors.add(USERNAME); + tempDescriptors.add(PASSWORD); + tempDescriptors.add(CHARSET); + propertyDescriptors = Collections.unmodifiableList(tempDescriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + // Either input connection or scheduled query is required + if ( ! context.getProperty(INFLUX_DB_QUERY).isSet() + && ! context.hasIncomingConnection() ) { + String error = "The InfluxDB Query processor requires input connection or scheduled InfluxDB query"; + getLogger().error(error); + throw new ProcessException(error); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + String query = null; + String database = null; + TimeUnit queryResultTimeunit = null; + Charset charset = null; + FlowFile outgoingFlowFile = null; + + // If there are incoming connections, prepare query params from flow file + if ( context.hasIncomingConnection() ) { + FlowFile incomingFlowFile = session.get(); + + if ( incomingFlowFile == null && context.hasNonLoopConnection() ) { + return; + } + + charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingFlowFile).getValue()); + if ( incomingFlowFile.getSize() == 0 ) { + if ( context.getProperty(INFLUX_DB_QUERY).isSet() ) { + query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue(); + } else { + String message = "FlowFile query is empty and no scheduled query is set"; + getLogger().error(message); + incomingFlowFile = session.putAttribute(incomingFlowFile, INFLUX_DB_ERROR_MESSAGE, message); + session.transfer(incomingFlowFile, REL_FAILURE); + return; + } + } else { + + try { + query = getQuery(session, charset, incomingFlowFile); + } catch(IOException ioe) { + getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), ioe); + throw new ProcessException(ioe); + } + } + outgoingFlowFile = incomingFlowFile; + + } else { + outgoingFlowFile = session.create(); + charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(outgoingFlowFile).getValue()); + query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue(); + } + + database = context.getProperty(DB_NAME).evaluateAttributeExpressions(outgoingFlowFile).getValue(); + queryResultTimeunit = TimeUnit.valueOf(context.getProperty(INFLUX_DB_QUERY_RESULT_TIMEUNIT).evaluateAttributeExpressions(outgoingFlowFile).getValue()); + + try { + long startTimeMillis = System.currentTimeMillis(); + QueryResult result = executeQuery(context, database, query, queryResultTimeunit); + + String json = gson.toJson(result); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Query result {} ", new Object[] {result}); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(json.getBytes(charset)); + session.importFrom(bais, outgoingFlowFile); + bais.close(); + + final long endTimeMillis = System.currentTimeMillis(); + + if ( ! result.hasError() ) { + outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query)); + session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database), + (endTimeMillis - startTimeMillis)); + session.transfer(outgoingFlowFile, REL_SUCCESS); + } else { + outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, result.getError()); + session.transfer(outgoingFlowFile, REL_FAILURE); + } + + } catch (Exception exception) { + outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage()); + if ( exception.getCause() instanceof SocketTimeoutException ) { + getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying", + new Object[]{exception.getCause().getLocalizedMessage()}, exception.getCause()); + session.transfer(outgoingFlowFile, REL_RETRY); + } else { + getLogger().error("Failed to read from InfluxDB due to {}", + new Object[]{exception.getLocalizedMessage()}, exception); + session.transfer(outgoingFlowFile, REL_FAILURE); + } + context.yield(); + } + } + + protected String getQuery(final ProcessSession session, Charset charset, FlowFile incomingFlowFile) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(incomingFlowFile, baos); + baos.close(); + return new String(baos.toByteArray(), charset); + } + + protected String makeProvenanceUrl(final ProcessContext context, String database) { + return new StringBuilder("influxdb://") + .append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/") + .append(database).toString(); + } + + protected QueryResult executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit) { + return getInfluxDB(context).query(new Query(query, database),timeunit); + } + + protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query, + String message) { + Map attributes = new HashMap<>(); + attributes.put(INFLUX_DB_ERROR_MESSAGE, String.valueOf(message)); + attributes.put(INFLUX_DB_EXECUTED_QUERY, String.valueOf(query)); + flowFile = session.putAllAttributes(flowFile, attributes); + return flowFile; + } + + @OnStopped + public void close() { + super.close(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java index f507768b4a..cbce66703d 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java @@ -29,6 +29,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -131,6 +132,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { super.onScheduled(context); + maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue(); } @Override diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 008a00ac57..919b7b95d8 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,3 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.influxdb.PutInfluxDB +org.apache.nifi.processors.influxdb.ExecuteInfluxDBQuery diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java new file mode 100644 index 0000000000..48ec1495dd --- /dev/null +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.util.TestRunner; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.After; + +/** + * Base integration test class for InfluxDB processors + */ +public class AbstractITInfluxDB { + protected TestRunner runner; + protected InfluxDB influxDB; + protected String dbName = "test"; + protected String dbUrl = "http://localhost:8086"; + protected String user = "admin"; + protected String password = "admin"; + protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + + protected void initInfluxDB() throws InterruptedException, Exception { + influxDB = InfluxDBFactory.connect(dbUrl,user,password); + influxDB.createDatabase(dbName); + int max = 10; + while (!influxDB.databaseExists(dbName) && (max-- < 0)) { + Thread.sleep(5); + } + if ( ! influxDB.databaseExists(dbName) ) { + throw new Exception("unable to create database " + dbName); + } + } + + protected void cleanUpDatabase() throws InterruptedException { + if ( influxDB.databaseExists(dbName) ) { + QueryResult result = influxDB.query(new Query("DROP measurement water", dbName)); + checkError(result); + result = influxDB.query(new Query("DROP measurement testm", dbName)); + checkError(result); + result = influxDB.query(new Query("DROP database " + dbName, dbName)); + Thread.sleep(1000); + } + } + + protected void checkError(QueryResult result) { + if ( result.hasError() ) { + throw new IllegalStateException("Error while dropping measurements " + result.getError()); + } + } + + @After + public void tearDown() throws Exception { + runner = null; + if ( influxDB != null ) { + cleanUpDatabase(); + influxDB.close(); + } + } + + protected void initializeRunner() { + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, dbName); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, user); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, password); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, dbUrl); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.assertValid(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java new file mode 100644 index 0000000000..0a0844fc73 --- /dev/null +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.Assert; + +import java.io.StringReader; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.influxdb.dto.QueryResult.Series; +import org.junit.Before; +import org.junit.Test; + +import com.google.gson.Gson; + +/** + * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running + * on local host with default port and has database test with table test. Please set user + * and password if applicable before running the integration tests. + */ +public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { + + protected Gson gson = new Gson(); + @Before + public void setUp() throws Exception { + initInfluxDB(); + runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); + initializeRunner(); + runner.setVariable("influxDBUrl", "http://localhost:8086"); + runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); + } + + @Test + public void testValidScheduleQueryWithNoIncoming() { + String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + + String query = "select * from water"; + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + + runner.setIncomingConnection(false); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + Series series = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); + } + + @Test + public void testValidSinglePoint() { + String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + String query = "select * from water"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + Series series = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); + } + + @Test + public void testShowDatabases() { + String query = "show databases"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + String result = new String(flowFiles.get(0).toByteArray()); + QueryResult queryResult = gson.fromJson(new StringReader(result), QueryResult.class); + Series series = queryResult.getResults().get(0).getSeries().get(0); + assertEquals("series name should be same", "databases", series.getName()); + assertEquals("series column should be same", "name", series.getColumns().get(0)); + boolean internal = series.getValues().get(0).stream().anyMatch(o -> o.equals("_internal")); + Assert.assertTrue("content should contain _internal " + queryResult, internal); + boolean test = series.getValues().stream().flatMap(i -> ((List)i).stream()).anyMatch(o -> o.equals("test")); + Assert.assertTrue("content should contain test " + queryResult, test); + } + + @Test + public void testCreateDB() { + String query = "create database test1"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + assertNotNull("QueryResult should not be null", queryResult.getResults()); + assertEquals("results array should be same size", 1, queryResult.getResults().size()); + assertNull("No series", queryResult.getResults().get(0).getSeries()); + } + + @Test + public void testEmptyFlowFileQueryWithScheduledQuery() { + String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + + String query = "select * from water"; + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + + byte [] bytes = new byte [] {}; + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + assertNotNull("QueryResult should not be null", queryResult.getResults()); + assertEquals("results array should be same size", 1, queryResult.getResults().size()); + Series series = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); + } + + @Test + public void testEmptyFlowFileQueryWithScheduledQueryEL() { + String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + + String query = "select * from ${measurement}"; + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + + byte [] bytes = new byte [] {}; + Map properties = new HashMap<>(); + properties.put("measurement","water"); + runner.enqueue(bytes, properties); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query.replace("${measurement}", "water"), flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + assertNotNull("QueryResult should not be null", queryResult.getResults()); + assertEquals("results array should be same size", 1, queryResult.getResults().size()); + Series series = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0); + } + + protected void validateSeries(String name, List columns, List values, String city, double rain) { + assertEquals("Series name should be same","water", name); + assertEquals("Series columns should be same","time", columns.get(0)); + assertEquals("Series columns should be same","city", columns.get(1)); + assertEquals("Series columns should be same","country", columns.get(2)); + assertEquals("Series columns should be same","humidity", columns.get(3)); + assertEquals("Series columns should be same","rain", columns.get(4)); + + assertEquals("time value should be same", 1.50100227485666867E18, values.get(0)); + assertEquals("city value should be same", city, values.get(1)); + assertEquals("contry value should be same", "US", values.get(2)); + assertEquals("humidity value should be same", 0.6, values.get(3)); + assertEquals("rain value should be same", rain, values.get(4)); + } + + @Test + public void testEmptyFlowFileQuery() { + String query = ""; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + List flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 0, flowFilesSuccess.size()); + List flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + assertEquals("Value should be equal", 1, flowFilesFailure.size()); + assertEquals("Value should be equal","FlowFile query is empty and no scheduled query is set", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertNull("Value should be null", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + } + + @Test + public void testNoFlowFileNoScheduledInfluxDBQuery() { + try { + runner.setIncomingConnection(false); + runner.run(1,true,true); + Assert.fail("Should throw assertion error"); + } catch(AssertionError error) { + assertEquals("Message should be same", + "Could not invoke methods annotated with @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException", + error.getLocalizedMessage()); + } + } + + @Test + public void testValidTwoPoints() { + String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652" + + System.lineSeparator() + + "water,country=US,city=nyc rain=2,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + String query = "select * from water"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + assertNotNull("QueryResult should not be null", queryResult.getResults()); + assertEquals("results array should be same size", 1, queryResult.getResults().size()); + assertEquals("Series size should be same",1, queryResult.getResults().get(0).getSeries().size()); + Series series1 = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series1.getName(),series1.getColumns(), series1.getValues().get(0),"newark",1.0); + + Series series2 = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series2.getName(),series2.getColumns(), series2.getValues().get(1),"nyc",2.0); + } + + @Test + public void testMalformedQuery() { + String query = "select * from"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + List flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 0, flowFilesSuccess.size()); + List flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + assertEquals("Value should be equal", 1, flowFilesFailure.size()); + assertEquals("Value should be equal","{\"error\":\"error parsing query: found EOF, expected identifier at line 1, char 15\"}", + flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE).trim()); + assertEquals("Value should be equal",query, flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + } + + @Test + public void testQueryResultHasError() { + ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + QueryResult result = super.executeQuery(context, database, query, timeunit); + result.setError("Test Error"); + return result; + } + + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + initializeRunner(); + + byte [] bytes = "select * from /.*/".getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + + assertEquals("Test Error",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + } + + @Test + public void testValidSameTwoPoints() { + String message = "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652" + + System.lineSeparator() + + "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652"; + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + String query = "select * from water"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); + + QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); + assertNotNull("QueryResult should not be null", queryResult.getResults()); + assertEquals("Result size should be same", 1, queryResult.getResults().size()); + Series series = queryResult.getResults().get(0).getSeries().get(0); + validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"nyc",1.0); + } + + @Test + public void testValidTwoPointsUrlEL() { + runner.setVariable("influxDBUrl", "http://localhost:8086"); + runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); + testValidTwoPoints(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java index 8db743dce4..ed5a7e101c 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java @@ -18,13 +18,9 @@ package org.apache.nifi.processors.influxdb; import static org.junit.Assert.assertEquals; import java.util.List; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.influxdb.InfluxDB; -import org.influxdb.InfluxDBFactory; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -33,63 +29,22 @@ import org.junit.Test; * on local host with default port and has database test with table test. Please set user * and password if applicable before running the integration tests. */ -public class ITPutInfluxDB { - - private TestRunner runner; - private InfluxDB influxDB; - private String dbName = "test"; - private String dbUrl = "http://localhost:8086"; - private String user = "admin"; - private String password = "admin"; +public class ITPutInfluxDB extends AbstractITInfluxDB { @Before public void setUp() throws Exception { runner = TestRunners.newTestRunner(PutInfluxDB.class); - runner.setProperty(PutInfluxDB.DB_NAME, dbName); - runner.setProperty(PutInfluxDB.USERNAME, user); - runner.setProperty(PutInfluxDB.PASSWORD, password); - runner.setProperty(PutInfluxDB.INFLUX_DB_URL, dbUrl); - runner.setProperty(PutInfluxDB.CHARSET, "UTF-8"); + initializeRunner(); runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); - runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); + runner.setProperty(PutInfluxDB.RETENTION_POLICY, DEFAULT_RETENTION_POLICY); runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); runner.assertValid(); - influxDB = InfluxDBFactory.connect(dbUrl,user,password); - if ( influxDB.databaseExists(dbName) ) { - QueryResult result = influxDB.query(new Query("DROP measurement water", dbName)); - checkError(result); - result = influxDB.query(new Query("DROP measurement testm", dbName)); - checkError(result); - result = influxDB.query(new Query("DROP database " + dbName, dbName)); - Thread.sleep(1000); - } - influxDB.createDatabase(dbName); - int max = 10; - while (!influxDB.databaseExists(dbName) && (max-- < 0)) { - Thread.sleep(5); - } - if ( ! influxDB.databaseExists(dbName) ) { - throw new Exception("unable to create database " + dbName); - } - } - - protected void checkError(QueryResult result) { - if ( result.hasError() ) { - throw new IllegalStateException("Error while dropping measurements " + result.getError()); - } - } - - @After - public void tearDown() throws Exception { - runner = null; - if ( influxDB != null ) { - influxDB.close(); - } + initInfluxDB(); } @Test public void testValidSinglePoint() { - String message = "water,country=US,city=newark rain=1,humidity=0.6"; + String message = "water,country=US,city=newark rain=1,humidity=0.6 "; byte [] bytes = message.getBytes(); runner.enqueue(bytes); runner.run(1,true,true); diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java new file mode 100644 index 0000000000..dfed7009ba --- /dev/null +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestExecutetInfluxDBQuery { + private TestRunner runner; + private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery; + + @Before + public void setUp() throws Exception { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { + return null; + } + + @Override + protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + return null; + } + + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "user"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "password"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + @Test + public void testDefaultValid() { + runner.assertValid(); + } + + @Test + public void testQueryThrowsRuntimeException() { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { + return null; + } + + @Override + protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + throw new RuntimeException("runtime exception"); + } + + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.assertValid(); + + byte [] bytes = "select * from /.*/".getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + + assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception"); + } + + @Test + public void testQueryThrowsRuntimeExceptionWithSocketTimeoutException() { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { + return null; + } + + @Override + protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout")); + } + + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.assertValid(); + + byte [] bytes = "select * from /.*/".getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_RETRY, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_RETRY); + + assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception"); + } + + @Test(expected=ProcessException.class) + public void testMakingQueryThrowsIOException() throws Throwable { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { + return null; + } + + @Override + protected String getQuery(ProcessSession session, Charset charset, FlowFile incomingFlowFile) + throws IOException { + throw new IOException("Test IOException"); + } + }; + + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.assertValid(); + + byte [] bytes = "select * from /.*/".getBytes(); + runner.enqueue(bytes); + try { + runner.run(1,true,true); + } catch (AssertionError e) { + throw e.getCause(); + } + } + + @Test + public void testMakeConnectionThrowsRuntimeException() { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { + throw new RuntimeException("testException"); + } + + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.assertValid(); + + byte [] bytes = "select * from /.*/".getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + + assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"Error while getting connection testException"); + } + + @Test + public void testTriggerThrowsException() { + mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { + @Override + protected InfluxDB getInfluxDB(ProcessContext context) { + throw new RuntimeException("testException"); + } + }; + runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery); + runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test"); + runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1"); + runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1"); + runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8"); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl"); + runner.assertValid(); + + byte [] bytes = "select * from".getBytes(); + runner.enqueue(bytes); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1); + + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE); + + assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"testException"); + } + +} \ No newline at end of file