NIFI-4927 - InfluxDB Query Processor

This closes #2562

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
mans2singh 2018-03-17 18:50:08 -07:00 committed by Mike Thomsen
parent b2fae5f56e
commit 4366c67b27
9 changed files with 917 additions and 61 deletions

View File

@ -56,6 +56,11 @@
<artifactId>slf4j-simple</artifactId> <artifactId>slf4j-simple</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -15,31 +15,27 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.influxdb; package org.apache.nifi.processors.influxdb;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB; import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory; import org.influxdb.InfluxDBFactory;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.OkHttpClient.Builder; import okhttp3.OkHttpClient.Builder;
/** /**
* Abstract base class for InfluxDB processors * Abstract base class for InfluxDB processors
*/ */
abstract class AbstractInfluxDBProcessor extends AbstractProcessor { public abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("influxdb-charset") .name("influxdb-charset")
.displayName("Character Set") .displayName("Character Set")
.description("Specifies the character set of the document data.") .description("Specifies the character set of the document data.")
@ -96,7 +92,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.sensitive(true) .sensitive(true)
.build(); .build();
protected static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder()
.name("influxdb-max-records-size") .name("influxdb-max-records-size")
.displayName("Max size of records") .displayName("Max size of records")
.description("Maximum size of records allowed to be posted in one batch") .description("Maximum size of records allowed to be posted in one batch")
@ -107,7 +103,6 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.build(); .build();
public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message"; public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message";
protected AtomicReference<InfluxDB> influxDB = new AtomicReference<>(); protected AtomicReference<InfluxDB> influxDB = new AtomicReference<>();
protected long maxRecordsSize; protected long maxRecordsSize;
@ -121,12 +116,11 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS); long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue(); String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
try { try {
influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout)); influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
} catch(Exception e) { } catch(Exception e) {
getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e); getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e); throw new RuntimeException("Error while getting connection " + e.getLocalizedMessage(),e);
} }
getLogger().info("InfluxDB connection created for host {}", getLogger().info("InfluxDB connection created for host {}",
new Object[] {influxDbUrl}); new Object[] {influxDbUrl});
@ -136,7 +130,6 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
} }
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) { protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).")
@WritesAttributes({
@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"),
@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"),
})
public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query";
public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder()
.name("influxdb-query-result-time-unit")
.displayName("Query Result Time Units")
.description("The time unit of query results from the InfluxDB")
.defaultValue(TimeUnit.NANOSECONDS.name())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet()))
.sensitive(false)
.build();
public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder()
.name("influxdb-query")
.displayName("InfluxDB Query")
.description("The InfluxDB query to execute. "
+ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise"
+ " it is created from this property.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successful InfluxDB queries are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Falied InfluxDB queries are routed to this relationship").build();
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("Failed queries that are retryable exception are routed to this relationship").build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
protected Gson gson = new Gson();
static {
final Set<Relationship> tempRelationships = new HashSet<>();
tempRelationships.add(REL_SUCCESS);
tempRelationships.add(REL_FAILURE);
tempRelationships.add(REL_RETRY);
relationships = Collections.unmodifiableSet(tempRelationships);
final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
tempDescriptors.add(DB_NAME);
tempDescriptors.add(INFLUX_DB_URL);
tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT);
tempDescriptors.add(INFLUX_DB_QUERY);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(CHARSET);
propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
// Either input connection or scheduled query is required
if ( ! context.getProperty(INFLUX_DB_QUERY).isSet()
&& ! context.hasIncomingConnection() ) {
String error = "The InfluxDB Query processor requires input connection or scheduled InfluxDB query";
getLogger().error(error);
throw new ProcessException(error);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
String query = null;
String database = null;
TimeUnit queryResultTimeunit = null;
Charset charset = null;
FlowFile outgoingFlowFile = null;
// If there are incoming connections, prepare query params from flow file
if ( context.hasIncomingConnection() ) {
FlowFile incomingFlowFile = session.get();
if ( incomingFlowFile == null && context.hasNonLoopConnection() ) {
return;
}
charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingFlowFile).getValue());
if ( incomingFlowFile.getSize() == 0 ) {
if ( context.getProperty(INFLUX_DB_QUERY).isSet() ) {
query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue();
} else {
String message = "FlowFile query is empty and no scheduled query is set";
getLogger().error(message);
incomingFlowFile = session.putAttribute(incomingFlowFile, INFLUX_DB_ERROR_MESSAGE, message);
session.transfer(incomingFlowFile, REL_FAILURE);
return;
}
} else {
try {
query = getQuery(session, charset, incomingFlowFile);
} catch(IOException ioe) {
getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), ioe);
throw new ProcessException(ioe);
}
}
outgoingFlowFile = incomingFlowFile;
} else {
outgoingFlowFile = session.create();
charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(outgoingFlowFile).getValue());
query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue();
}
database = context.getProperty(DB_NAME).evaluateAttributeExpressions(outgoingFlowFile).getValue();
queryResultTimeunit = TimeUnit.valueOf(context.getProperty(INFLUX_DB_QUERY_RESULT_TIMEUNIT).evaluateAttributeExpressions(outgoingFlowFile).getValue());
try {
long startTimeMillis = System.currentTimeMillis();
QueryResult result = executeQuery(context, database, query, queryResultTimeunit);
String json = gson.toJson(result);
if ( getLogger().isDebugEnabled() ) {
getLogger().debug("Query result {} ", new Object[] {result});
}
ByteArrayInputStream bais = new ByteArrayInputStream(json.getBytes(charset));
session.importFrom(bais, outgoingFlowFile);
bais.close();
final long endTimeMillis = System.currentTimeMillis();
if ( ! result.hasError() ) {
outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database),
(endTimeMillis - startTimeMillis));
session.transfer(outgoingFlowFile, REL_SUCCESS);
} else {
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, result.getError());
session.transfer(outgoingFlowFile, REL_FAILURE);
}
} catch (Exception exception) {
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage());
if ( exception.getCause() instanceof SocketTimeoutException ) {
getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying",
new Object[]{exception.getCause().getLocalizedMessage()}, exception.getCause());
session.transfer(outgoingFlowFile, REL_RETRY);
} else {
getLogger().error("Failed to read from InfluxDB due to {}",
new Object[]{exception.getLocalizedMessage()}, exception);
session.transfer(outgoingFlowFile, REL_FAILURE);
}
context.yield();
}
}
protected String getQuery(final ProcessSession session, Charset charset, FlowFile incomingFlowFile)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(incomingFlowFile, baos);
baos.close();
return new String(baos.toByteArray(), charset);
}
protected String makeProvenanceUrl(final ProcessContext context, String database) {
return new StringBuilder("influxdb://")
.append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/")
.append(database).toString();
}
protected QueryResult executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit) {
return getInfluxDB(context).query(new Query(query, database),timeunit);
}
protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query,
String message) {
Map<String,String> attributes = new HashMap<>();
attributes.put(INFLUX_DB_ERROR_MESSAGE, String.valueOf(message));
attributes.put(INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
flowFile = session.putAllAttributes(flowFile, attributes);
return flowFile;
}
@OnStopped
public void close() {
super.close();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
@ -131,6 +132,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
super.onScheduled(context); super.onScheduled(context);
maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
} }
@Override @Override

View File

@ -13,3 +13,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.influxdb.PutInfluxDB org.apache.nifi.processors.influxdb.PutInfluxDB
org.apache.nifi.processors.influxdb.ExecuteInfluxDBQuery

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.util.TestRunner;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.After;
/**
* Base integration test class for InfluxDB processors
*/
public class AbstractITInfluxDB {
protected TestRunner runner;
protected InfluxDB influxDB;
protected String dbName = "test";
protected String dbUrl = "http://localhost:8086";
protected String user = "admin";
protected String password = "admin";
protected static final String DEFAULT_RETENTION_POLICY = "autogen";
protected void initInfluxDB() throws InterruptedException, Exception {
influxDB = InfluxDBFactory.connect(dbUrl,user,password);
influxDB.createDatabase(dbName);
int max = 10;
while (!influxDB.databaseExists(dbName) && (max-- < 0)) {
Thread.sleep(5);
}
if ( ! influxDB.databaseExists(dbName) ) {
throw new Exception("unable to create database " + dbName);
}
}
protected void cleanUpDatabase() throws InterruptedException {
if ( influxDB.databaseExists(dbName) ) {
QueryResult result = influxDB.query(new Query("DROP measurement water", dbName));
checkError(result);
result = influxDB.query(new Query("DROP measurement testm", dbName));
checkError(result);
result = influxDB.query(new Query("DROP database " + dbName, dbName));
Thread.sleep(1000);
}
}
protected void checkError(QueryResult result) {
if ( result.hasError() ) {
throw new IllegalStateException("Error while dropping measurements " + result.getError());
}
}
@After
public void tearDown() throws Exception {
runner = null;
if ( influxDB != null ) {
cleanUpDatabase();
influxDB.close();
}
}
protected void initializeRunner() {
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, dbName);
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, user);
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, password);
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, dbUrl);
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.assertValid();
}
}

View File

@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Assert;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.Gson;
/**
* Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running
* on local host with default port and has database test with table test. Please set user
* and password if applicable before running the integration tests.
*/
public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
protected Gson gson = new Gson();
@Before
public void setUp() throws Exception {
initInfluxDB();
runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class);
initializeRunner();
runner.setVariable("influxDBUrl", "http://localhost:8086");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
}
@Test
public void testValidScheduleQueryWithNoIncoming() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
runner.setIncomingConnection(false);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testValidSinglePoint() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testShowDatabases() {
String query = "show databases";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
String result = new String(flowFiles.get(0).toByteArray());
QueryResult queryResult = gson.fromJson(new StringReader(result), QueryResult.class);
Series series = queryResult.getResults().get(0).getSeries().get(0);
assertEquals("series name should be same", "databases", series.getName());
assertEquals("series column should be same", "name", series.getColumns().get(0));
boolean internal = series.getValues().get(0).stream().anyMatch(o -> o.equals("_internal"));
Assert.assertTrue("content should contain _internal " + queryResult, internal);
boolean test = series.getValues().stream().flatMap(i -> ((List<Object>)i).stream()).anyMatch(o -> o.equals("test"));
Assert.assertTrue("content should contain test " + queryResult, test);
}
@Test
public void testCreateDB() {
String query = "create database test1";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull("QueryResult should not be null", queryResult.getResults());
assertEquals("results array should be same size", 1, queryResult.getResults().size());
assertNull("No series", queryResult.getResults().get(0).getSeries());
}
@Test
public void testEmptyFlowFileQueryWithScheduledQuery() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
byte [] bytes = new byte [] {};
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull("QueryResult should not be null", queryResult.getResults());
assertEquals("results array should be same size", 1, queryResult.getResults().size());
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
@Test
public void testEmptyFlowFileQueryWithScheduledQueryEL() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from ${measurement}";
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
byte [] bytes = new byte [] {};
Map<String,String> properties = new HashMap<>();
properties.put("measurement","water");
runner.enqueue(bytes, properties);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query.replace("${measurement}", "water"), flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull("QueryResult should not be null", queryResult.getResults());
assertEquals("results array should be same size", 1, queryResult.getResults().size());
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"newark",1.0);
}
protected void validateSeries(String name, List<String> columns, List<Object> values, String city, double rain) {
assertEquals("Series name should be same","water", name);
assertEquals("Series columns should be same","time", columns.get(0));
assertEquals("Series columns should be same","city", columns.get(1));
assertEquals("Series columns should be same","country", columns.get(2));
assertEquals("Series columns should be same","humidity", columns.get(3));
assertEquals("Series columns should be same","rain", columns.get(4));
assertEquals("time value should be same", 1.50100227485666867E18, values.get(0));
assertEquals("city value should be same", city, values.get(1));
assertEquals("contry value should be same", "US", values.get(2));
assertEquals("humidity value should be same", 0.6, values.get(3));
assertEquals("rain value should be same", rain, values.get(4));
}
@Test
public void testEmptyFlowFileQuery() {
String query = "";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 0, flowFilesSuccess.size());
List<MockFlowFile> flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals("Value should be equal", 1, flowFilesFailure.size());
assertEquals("Value should be equal","FlowFile query is empty and no scheduled query is set", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertNull("Value should be null", flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
}
@Test
public void testNoFlowFileNoScheduledInfluxDBQuery() {
try {
runner.setIncomingConnection(false);
runner.run(1,true,true);
Assert.fail("Should throw assertion error");
} catch(AssertionError error) {
assertEquals("Message should be same",
"Could not invoke methods annotated with @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException",
error.getLocalizedMessage());
}
}
@Test
public void testValidTwoPoints() {
String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652" +
System.lineSeparator() +
"water,country=US,city=nyc rain=2,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull("QueryResult should not be null", queryResult.getResults());
assertEquals("results array should be same size", 1, queryResult.getResults().size());
assertEquals("Series size should be same",1, queryResult.getResults().get(0).getSeries().size());
Series series1 = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series1.getName(),series1.getColumns(), series1.getValues().get(0),"newark",1.0);
Series series2 = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series2.getName(),series2.getColumns(), series2.getValues().get(1),"nyc",2.0);
}
@Test
public void testMalformedQuery() {
String query = "select * from";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFilesSuccess = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 0, flowFilesSuccess.size());
List<MockFlowFile> flowFilesFailure = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals("Value should be equal", 1, flowFilesFailure.size());
assertEquals("Value should be equal","{\"error\":\"error parsing query: found EOF, expected identifier at line 1, char 15\"}",
flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE).trim());
assertEquals("Value should be equal",query, flowFilesFailure.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
}
@Test
public void testQueryResultHasError() {
ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
QueryResult result = super.executeQuery(context, database, query, timeunit);
result.setError("Test Error");
return result;
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
initializeRunner();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals("Test Error",flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
}
@Test
public void testValidSameTwoPoints() {
String message = "water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652" +
System.lineSeparator() +
"water,country=US,city=nyc rain=1,humidity=0.6 1501002274856668652";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from water";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class);
assertNotNull("QueryResult should not be null", queryResult.getResults());
assertEquals("Result size should be same", 1, queryResult.getResults().size());
Series series = queryResult.getResults().get(0).getSeries().get(0);
validateSeries(series.getName(), series.getColumns(), series.getValues().get(0),"nyc",1.0);
}
@Test
public void testValidTwoPointsUrlEL() {
runner.setVariable("influxDBUrl", "http://localhost:8086");
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
testValidTwoPoints();
}
}

View File

@ -18,13 +18,9 @@ package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.List; import java.util.List;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query; import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -33,63 +29,22 @@ import org.junit.Test;
* on local host with default port and has database test with table test. Please set user * on local host with default port and has database test with table test. Please set user
* and password if applicable before running the integration tests. * and password if applicable before running the integration tests.
*/ */
public class ITPutInfluxDB { public class ITPutInfluxDB extends AbstractITInfluxDB {
private TestRunner runner;
private InfluxDB influxDB;
private String dbName = "test";
private String dbUrl = "http://localhost:8086";
private String user = "admin";
private String password = "admin";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
runner = TestRunners.newTestRunner(PutInfluxDB.class); runner = TestRunners.newTestRunner(PutInfluxDB.class);
runner.setProperty(PutInfluxDB.DB_NAME, dbName); initializeRunner();
runner.setProperty(PutInfluxDB.USERNAME, user);
runner.setProperty(PutInfluxDB.PASSWORD, password);
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, dbUrl);
runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue()); runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen"); runner.setProperty(PutInfluxDB.RETENTION_POLICY, DEFAULT_RETENTION_POLICY);
runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB"); runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
runner.assertValid(); runner.assertValid();
influxDB = InfluxDBFactory.connect(dbUrl,user,password); initInfluxDB();
if ( influxDB.databaseExists(dbName) ) {
QueryResult result = influxDB.query(new Query("DROP measurement water", dbName));
checkError(result);
result = influxDB.query(new Query("DROP measurement testm", dbName));
checkError(result);
result = influxDB.query(new Query("DROP database " + dbName, dbName));
Thread.sleep(1000);
}
influxDB.createDatabase(dbName);
int max = 10;
while (!influxDB.databaseExists(dbName) && (max-- < 0)) {
Thread.sleep(5);
}
if ( ! influxDB.databaseExists(dbName) ) {
throw new Exception("unable to create database " + dbName);
}
}
protected void checkError(QueryResult result) {
if ( result.hasError() ) {
throw new IllegalStateException("Error while dropping measurements " + result.getError());
}
}
@After
public void tearDown() throws Exception {
runner = null;
if ( influxDB != null ) {
influxDB.close();
}
} }
@Test @Test
public void testValidSinglePoint() { public void testValidSinglePoint() {
String message = "water,country=US,city=newark rain=1,humidity=0.6"; String message = "water,country=US,city=newark rain=1,humidity=0.6 ";
byte [] bytes = message.getBytes(); byte [] bytes = message.getBytes();
runner.enqueue(bytes); runner.enqueue(bytes);
runner.run(1,true,true); runner.run(1,true,true);

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestExecutetInfluxDBQuery {
private TestRunner runner;
private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery;
@Before
public void setUp() throws Exception {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
return null;
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "user");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "password");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.assertValid();
}
@After
public void tearDown() throws Exception {
runner = null;
}
@Test
public void testDefaultValid() {
runner.assertValid();
}
@Test
public void testQueryThrowsRuntimeException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
throw new RuntimeException("runtime exception");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception");
}
@Test
public void testQueryThrowsRuntimeExceptionWithSocketTimeoutException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout"));
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_RETRY, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_RETRY);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"runtime exception");
}
@Test(expected=ProcessException.class)
public void testMakingQueryThrowsIOException() throws Throwable {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
return null;
}
@Override
protected String getQuery(ProcessSession session, Charset charset, FlowFile incomingFlowFile)
throws IOException {
throw new IOException("Test IOException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
try {
runner.run(1,true,true);
} catch (AssertionError e) {
throw e.getCause();
}
}
@Test
public void testMakeConnectionThrowsRuntimeException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from /.*/".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"Error while getting connection testException");
}
@Test
public void testTriggerThrowsException() {
mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected InfluxDB getInfluxDB(ProcessContext context) {
throw new RuntimeException("testException");
}
};
runner = TestRunners.newTestRunner(mockExecuteInfluxDBQuery);
runner.setProperty(ExecuteInfluxDBQuery.DB_NAME, "test");
runner.setProperty(ExecuteInfluxDBQuery.USERNAME, "u1");
runner.setProperty(ExecuteInfluxDBQuery.PASSWORD, "p1");
runner.setProperty(ExecuteInfluxDBQuery.CHARSET, "UTF-8");
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_URL, "http://dbUrl");
runner.assertValid();
byte [] bytes = "select * from".getBytes();
runner.enqueue(bytes);
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_FAILURE);
assertEquals(flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE),"testException");
}
}