From ed30bb9b7872840b3b5348766c0e68b69617abd8 Mon Sep 17 00:00:00 2001 From: Michal Misiewicz Date: Sun, 29 Apr 2018 10:01:34 +0200 Subject: [PATCH] NIFI-5130 ExecuteInfluxDBQuery processor chunking support This closes #2666 Signed-off-by: Mike Thomsen --- .../influxdb/ExecuteInfluxDBQuery.java | 74 +++++++++++++++++-- .../influxdb/AbstractITInfluxDB.java | 9 +++ .../influxdb/ITExecuteInfluxDBQuery.java | 58 ++++++++++++--- .../influxdb/TestExecutetInfluxDBQuery.java | 22 +++--- 4 files changed, 135 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java index ddb09724ad..a029f14a9c 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.nifi.processors.influxdb; + import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -32,9 +33,11 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.InfluxDB; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import com.google.gson.Gson; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -48,6 +51,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -64,6 +69,8 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0; + public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() .name("influxdb-query-result-time-unit") .displayName("Query Result Time Units") @@ -86,6 +93,20 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final Integer MAX_CHUNK_SIZE = 10000; + + public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new PropertyDescriptor.Builder() + .name("influxdb-query-chunk-size") + .displayName("Results chunk size") + .description("Chunking can be used to return results in a stream of smaller batches " + + "(each has a partial results up to a chunk size) rather than as a single response. " + + "Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0") + .defaultValue(String.valueOf(DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE)) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createLongValidator(0, MAX_CHUNK_SIZE, true)) + .required(true) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("Successful InfluxDB queries are routed to this relationship").build(); @@ -111,6 +132,7 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT); tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT); tempDescriptors.add(INFLUX_DB_QUERY); + tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE); tempDescriptors.add(USERNAME); tempDescriptors.add(PASSWORD); tempDescriptors.add(CHARSET); @@ -189,9 +211,10 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { try { long startTimeMillis = System.currentTimeMillis(); - QueryResult result = executeQuery(context, database, query, queryResultTimeunit); + int chunkSize = context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger(); + List result = executeQuery(context, database, query, queryResultTimeunit, chunkSize); - String json = gson.toJson(result); + String json = result.size() == 1 ? gson.toJson(result.get(0)) : gson.toJson(result); if ( getLogger().isDebugEnabled() ) { getLogger().debug("Query result {} ", new Object[] {result}); @@ -203,13 +226,13 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { final long endTimeMillis = System.currentTimeMillis(); - if ( ! result.hasError() ) { + if ( ! hasErrors(result) ) { outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query)); session.getProvenanceReporter().send(outgoingFlowFile, makeProvenanceUrl(context, database), (endTimeMillis - startTimeMillis)); session.transfer(outgoingFlowFile, REL_SUCCESS); } else { - outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, result.getError()); + outgoingFlowFile = populateErrorAttributes(session, outgoingFlowFile, query, queryErrors(result)); session.transfer(outgoingFlowFile, REL_FAILURE); } @@ -242,8 +265,31 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { .append(database).toString(); } - protected QueryResult executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit) { - return getInfluxDB(context).query(new Query(query, database),timeunit); + protected List executeQuery(final ProcessContext context, String database, String query, TimeUnit timeunit, + int chunkSize) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + InfluxDB influx = getInfluxDB(context); + Query influxQuery = new Query(query, database); + + if (chunkSize > 0) { + List results = new LinkedList<>(); + influx.query(influxQuery, chunkSize, result -> { + if (isQueryDone(result.getError())) { + latch.countDown(); + } else { + results.add(result); + } + }); + latch.await(); + + return results; + } else { + return Collections.singletonList(influx.query(influxQuery, timeunit)); + } + } + + private boolean isQueryDone(String error) { + return error != null && error.equals("DONE"); } protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, String query, @@ -255,6 +301,22 @@ public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { return flowFile; } + private Boolean hasErrors(List results) { + for (QueryResult result: results) { + if (result.hasError()) { + return true; + } + } + return false; + } + + private String queryErrors(List results) { + return results.stream() + .filter(QueryResult::hasError) + .map(QueryResult::getError) + .collect(Collectors.joining("\n")); + } + @OnStopped public void close() { super.close(); diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java index 48ec1495dd..120a92de3f 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java @@ -15,6 +15,8 @@ * limitations under the License. */ package org.apache.nifi.processors.influxdb; + +import com.google.gson.reflect.TypeToken; import org.apache.nifi.util.TestRunner; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; @@ -22,6 +24,9 @@ import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.junit.After; +import java.lang.reflect.Type; +import java.util.List; + /** * Base integration test class for InfluxDB processors */ @@ -34,6 +39,8 @@ public class AbstractITInfluxDB { protected String password = "admin"; protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + protected Type QueryResultListType = new TypeToken>(){}.getType(); + protected void initInfluxDB() throws InterruptedException, Exception { influxDB = InfluxDBFactory.connect(dbUrl,user,password); influxDB.createDatabase(dbName); @@ -52,6 +59,8 @@ public class AbstractITInfluxDB { checkError(result); result = influxDB.query(new Query("DROP measurement testm", dbName)); checkError(result); + result = influxDB.query(new Query("DROP measurement chunkedQueryTest", dbName)); + checkError(result); result = influxDB.query(new Query("DROP database " + dbName, dbName)); Thread.sleep(1000); } diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java index 0a0844fc73..a503731a35 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java @@ -16,28 +16,31 @@ */ package org.apache.nifi.processors.influxdb; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import org.junit.Assert; -import java.io.StringReader; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.influxdb.InfluxDB; import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Series; + +import java.io.StringReader; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.junit.Before; import org.junit.Test; import com.google.gson.Gson; + + /** * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running * on local host with default port and has database test with table test. Please set user @@ -274,12 +277,12 @@ public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { } @Test - public void testQueryResultHasError() { + public void testQueryResultHasError() throws Throwable { ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new ExecuteInfluxDBQuery() { @Override - protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { - QueryResult result = super.executeQuery(context, database, query, timeunit); - result.setError("Test Error"); + protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) throws InterruptedException{ + List result = super.executeQuery(context, database, query, timeunit, chunkSize); + result.get(0).setError("Test Error"); return result; } @@ -326,4 +329,37 @@ public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}"); testValidTwoPoints(); } + + @Test + public void testChunkedQuery() { + String message = + "chunkedQueryTest,country=GB value=1 1524938495000000000" + System.lineSeparator() + + "chunkedQueryTest,country=PL value=2 1524938505000000000" + System.lineSeparator() + + "chunkedQueryTest,country=US value=3 1524938505800000000"; + + influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + + String query = "select * from chunkedQueryTest"; + byte [] bytes = query.getBytes(); + runner.enqueue(bytes); + runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY_CHUNK_SIZE, "2"); + runner.run(1,true,true); + + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); + assertEquals("Value should be equal", 1, flowFiles.size()); + assertNull("Value should be null", flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); + + List queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResultListType); + + assertNotNull("QueryResult array should not be null", queryResult); + assertEquals("QueryResult array size should be equal 2", 2, queryResult.size()); + + assertEquals("First chunk should have 2 elements",2, chunkSize(queryResult.get(0))); + assertEquals("Second chunk should have 1 elements",1, chunkSize(queryResult.get(1))); + } + + private int chunkSize(QueryResult queryResult) { + return queryResult.getResults().get(0).getSeries().get(0).getValues().size(); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java index dfed7009ba..d6aad18a6a 100644 --- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java +++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.processors.influxdb; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -37,6 +29,14 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + public class TestExecutetInfluxDBQuery { private TestRunner runner; private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery; @@ -50,7 +50,7 @@ public class TestExecutetInfluxDBQuery { } @Override - protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { return null; } @@ -83,7 +83,7 @@ public class TestExecutetInfluxDBQuery { } @Override - protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { throw new RuntimeException("runtime exception"); } @@ -115,7 +115,7 @@ public class TestExecutetInfluxDBQuery { } @Override - protected QueryResult executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit) { + protected List executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) { throw new RuntimeException("runtime exception", new SocketTimeoutException("timeout")); }