NIFI-5130 ExecuteInfluxDBQuery processor chunking support

This closes #2666

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Michal Misiewicz 2018-04-29 10:01:34 +02:00 committed by Mike Thomsen
parent 3ad3243511
commit ed30bb9b78
4 changed files with 135 additions and 28 deletions

View File

@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -32,9 +33,11 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -48,6 +51,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -64,6 +69,8 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query";
private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0;
public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder()
.name("influxdb-query-result-time-unit")
.displayName("Query Result Time Units")
@ -86,6 +93,20 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Integer MAX_CHUNK_SIZE = 10000;
public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("influxdb-query-chunk-size")
.displayName("Results chunk size")
.description("Chunking can be used to return results in a stream of smaller batches "
+ "(each has a partial results up to a chunk size) rather than as a single response. "
+ "Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0")
.defaultValue(String.valueOf(DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createLongValidator(0, MAX_CHUNK_SIZE, true))
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successful InfluxDB queries are routed to this relationship").build();
@ -111,6 +132,7 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT);
tempDescriptors.add(INFLUX_DB_QUERY);
tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE);
tempDescriptors.add(USERNAME);
tempDescriptors.add(PASSWORD);
tempDescriptors.add(CHARSET);
@ -189,9 +211,10 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
try {
long startTimeMillis = System.currentTimeMillis();
QueryResult result = executeQuery(context, database, query, queryResultTimeunit);
int chunkSize = context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger();
List<QueryResult> result = executeQuery(context, database, query, queryResultTimeunit, chunkSize);
String json = gson.toJson(result);
String json = result.size() == 1 ? gson.toJson(result.get(0)) : gson.toJson(result);
if ( getLogger().isDebugEnabled() ) {
getLogger().debug("Query result {} ", new Object[] {result});
@ -203,13 +226,13 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
final long endTimeMillis = System.currentTimeMillis();
if ( ! result.hasError() ) {
if ( ! hasErrors(result) ) {
outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database),
(endTimeMillis - startTimeMillis));
session.transfer(outgoingFlowFile, REL_SUCCESS);
} else {
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, result.getError());
outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, queryErrors(result));
session.transfer(outgoingFlowFile, REL_FAILURE);
}
@ -242,8 +265,31 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
.append(database).toString();
}
protected QueryResult executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit) {
return getInfluxDB(context).query(new Query(query, database),timeunit);
protected List<QueryResult> executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit,
int chunkSize) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
InfluxDB influx = getInfluxDB(context);
Query influxQuery = new Query(query, database);
if (chunkSize > 0) {
List<QueryResult> results = new LinkedList<>();
influx.query(influxQuery, chunkSize, result -> {
if (isQueryDone(result.getError())) {
latch.countDown();
} else {
results.add(result);
}
});
latch.await();
return results;
} else {
return Collections.singletonList(influx.query(influxQuery, timeunit));
}
}
private boolean isQueryDone(String error) {
return error != null && error.equals("DONE");
}
protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query,
@ -255,6 +301,22 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
return flowFile;
}
private Boolean hasErrors(List<QueryResult> results) {
for (QueryResult result: results) {
if (result.hasError()) {
return true;
}
}
return false;
}
private String queryErrors(List<QueryResult> results) {
return results.stream()
.filter(QueryResult::hasError)
.map(QueryResult::getError)
.collect(Collectors.joining("\n"));
}
@OnStopped
public void close() {
super.close();

View File

@ -15,6 +15,8 @@
* limitations under the License.
*/
package org.apache.nifi.processors.influxdb;
import com.google.gson.reflect.TypeToken;
import org.apache.nifi.util.TestRunner;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
@ -22,6 +24,9 @@ import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.junit.After;
import java.lang.reflect.Type;
import java.util.List;
/**
* Base integration test class for InfluxDB processors
*/
@ -34,6 +39,8 @@ public class AbstractITInfluxDB {
protected String password = "admin";
protected static final String DEFAULT_RETENTION_POLICY = "autogen";
protected Type QueryResultListType = new TypeToken<List<QueryResult>>(){}.getType();
protected void initInfluxDB() throws InterruptedException, Exception {
influxDB = InfluxDBFactory.connect(dbUrl,user,password);
influxDB.createDatabase(dbName);
@ -52,6 +59,8 @@ public class AbstractITInfluxDB {
checkError(result);
result = influxDB.query(new Query("DROP measurement testm", dbName));
checkError(result);
result = influxDB.query(new Query("DROP measurement chunkedQueryTest", dbName));
checkError(result);
result = influxDB.query(new Query("DROP database " + dbName, dbName));
Thread.sleep(1000);
}

View File

@ -16,28 +16,31 @@
*/
package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import org.junit.Assert;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;
import org.influxdb.dto.QueryResult.Series;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.Gson;
/**
* Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running
* on local host with default port and has database test with table test. Please set user
@ -274,12 +277,12 @@ public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
}
@Test
public void testQueryResultHasError() {
public void testQueryResultHasError() throws Throwable {
ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() {
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
QueryResult result = super.executeQuery(context, database, query, timeunit);
result.setError("Test Error");
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) throws InterruptedException{
List<QueryResult> result = super.executeQuery(context, database, query, timeunit, chunkSize);
result.get(0).setError("Test Error");
return result;
}
@ -326,4 +329,37 @@ public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
testValidTwoPoints();
}
@Test
public void testChunkedQuery() {
String message =
"chunkedQueryTest,country=GB value=1 1524938495000000000" + System.lineSeparator() +
"chunkedQueryTest,country=PL value=2 1524938505000000000" + System.lineSeparator() +
"chunkedQueryTest,country=US value=3 1524938505800000000";
influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message);
String query = "select * from chunkedQueryTest";
byte [] bytes = query.getBytes();
runner.enqueue(bytes);
runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY_CHUNK_SIZE, "2");
runner.run(1,true,true);
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
assertEquals("Value should be equal", 1, flowFiles.size());
assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
List<QueryResult> queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResultListType);
assertNotNull("QueryResult array should not be null", queryResult);
assertEquals("QueryResult array size should be equal 2", 2, queryResult.size());
assertEquals("First chunk should have 2 elements",2, chunkSize(queryResult.get(0)));
assertEquals("Second chunk should have 1 elements",1, chunkSize(queryResult.get(1)));
}
private int chunkSize(QueryResult queryResult) {
return queryResult.getResults().get(0).getSeries().get(0).getValues().size();
}
}

View File

@ -16,14 +16,6 @@
*/
package org.apache.nifi.processors.influxdb;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -37,6 +29,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class TestExecutetInfluxDBQuery {
private TestRunner runner;
private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery;
@ -50,7 +50,7 @@ public class TestExecutetInfluxDBQuery {
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
return null;
}
@ -83,7 +83,7 @@ public class TestExecutetInfluxDBQuery {
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
throw new RuntimeException("runtime exception");
}
@ -115,7 +115,7 @@ public class TestExecutetInfluxDBQuery {
}
@Override
protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) {
protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) {
throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout"));
}