NIFI-12526: Fixed handling of Fetch Size in QueryCassandra, added fragment attributes

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8183.
This commit is contained in:
Matt Burgess 2023-12-21 20:45:28 -05:00 committed by Pierre Villard
parent 6e38beaccf
commit f3b38ddd40
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 122 additions and 51 deletions

View File

@ -182,6 +182,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONSISTENCY_LEVEL);
descriptors.add(COMPRESSION_TYPE);
descriptors.add(CHARSET);
}

View File

@ -48,12 +48,12 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@ -66,12 +66,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -85,7 +87,19 @@ import java.util.concurrent.atomic.AtomicLong;
+ "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.")
@WritesAttributes({@WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query")})
@WritesAttributes({
@WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ "attribute will not be populated."),
@WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")
})
public class QueryCassandra extends AbstractCassandraProcessor {
public static final String AVRO_FORMAT = "Avro";
@ -93,6 +107,10 @@ public class QueryCassandra extends AbstractCassandraProcessor {
public static final String RESULT_ROW_COUNT = "executecql.row.count";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
public static final PropertyDescriptor CQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("CQL select query")
.description("CQL select query")
@ -248,6 +266,8 @@ public class QueryCassandra extends AbstractCassandraProcessor {
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
final StopWatch stopWatch = new StopWatch(true);
final List<FlowFile> resultSetFlowFiles = new LinkedList<>();
try {
// The documentation for the driver recommends the session remain open the entire time the processor is running
// and states that it is thread-safe. This is why connectionSession is not in a try-with-resources.
@ -261,39 +281,37 @@ public class QueryCassandra extends AbstractCassandraProcessor {
}
final AtomicLong nrOfRows = new AtomicLong(0L);
long flowFileCount = 0;
if(fileToProcess == null) {
fileToProcess = session.create();
}
int fragmentIndex = 0;
final String fragmentId = UUID.randomUUID().toString();
while(true) {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing CQL query {}", new Object[]{selectQuery});
if (queryTimeout > 0) {
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
out, queryTimeout, TimeUnit.MILLISECONDS));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
out, charset, queryTimeout, TimeUnit.MILLISECONDS));
}
} else {
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
out, 0, null));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
out, charset, 0, null));
}
fileToProcess = session.write(fileToProcess, out -> {
try {
logger.debug("Executing CQL query {}", selectQuery);
if (queryTimeout > 0) {
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
out, queryTimeout, TimeUnit.MILLISECONDS));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
out, charset, queryTimeout, TimeUnit.MILLISECONDS));
}
} else {
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
out, 0, null));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
out, charset, 0, null));
}
} catch (final TimeoutException | InterruptedException | ExecutionException e) {
throw new ProcessException(e);
}
} catch (final TimeoutException | InterruptedException | ExecutionException e) {
throw new ProcessException(e);
}
});
@ -306,23 +324,37 @@ public class QueryCassandra extends AbstractCassandraProcessor {
if (logger.isDebugEnabled()) {
logger.info("{} contains {} records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
fileToProcess, nrOfRows.get());
}
if (maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentId);
fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(fileToProcess, REL_SUCCESS);
resultSetFlowFiles.add(fileToProcess);
if (outputBatchSize > 0) {
flowFileCount++;
if (flowFileCount == outputBatchSize) {
if (resultSetFlowFiles.size() == outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
session.commitAsync();
flowFileCount = 0;
// fileToProcess = session.create();
resultSetFlowFiles.clear();
}
}
fragmentIndex++;
resultSet.fetchMoreResults().get();
if (resultSet.isExhausted()) {
// If we are splitting results but not outputting batches, set count on all FlowFiles
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
}
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
session.commitAsync();
resultSetFlowFiles.clear();
break;
}
fileToProcess = session.create();
@ -408,11 +440,6 @@ public class QueryCassandra extends AbstractCassandraProcessor {
session.commitAsync();
}
private void handleException() {
}
@OnUnscheduled
public void stop(ProcessContext context) {
super.stop(context);
@ -463,26 +490,19 @@ public class QueryCassandra extends AbstractCassandraProcessor {
} else {
rs.fetchMoreResults().get(timeout, timeUnit);
}
rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching();
}
if(maxRowsPerFlowFile == 0){
maxRowsPerFlowFile = rowsAvailableWithoutFetching;
}
Row row;
//Iterator<Row> it = rs.iterator();
while(nrOfRows < maxRowsPerFlowFile){
while ((maxRowsPerFlowFile == 0) || nrOfRows < maxRowsPerFlowFile) {
try {
row = rs.iterator().next();
}catch (NoSuchElementException nsee){
nrOfRows -= 1;
} catch (NoSuchElementException nsee) {
break;
}
// iterator().next() is like iterator().one() => return null on end
// https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/ResultSet.html#one--
if(row == null){
if (row == null) {
break;
}

View File

@ -119,7 +119,7 @@ public class QueryCassandraIT {
public void testSimpleQuery() {
queryCassandraTestRunner.enqueue("");
queryCassandraTestRunner.run();
Assertions.assertEquals(LOAD_FLOW_FILE_SIZE, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
Assertions.assertEquals(1, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
queryCassandraTestRunner.clearTransferState();
}
@ -128,7 +128,27 @@ public class QueryCassandraIT {
queryCassandraTestRunner.removeProperty(QueryCassandra.OUTPUT_BATCH_SIZE);
queryCassandraTestRunner.enqueue("");
queryCassandraTestRunner.run();
Assertions.assertEquals(LOAD_FLOW_FILE_SIZE, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
Assertions.assertEquals(1, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
queryCassandraTestRunner.clearTransferState();
}
@Test
public void testWithMaxRowsPerFlowFile() {
queryCassandraTestRunner.setProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE, "10");
queryCassandraTestRunner.enqueue("");
queryCassandraTestRunner.run();
Assertions.assertEquals(100, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
queryCassandraTestRunner.clearTransferState();
}
@Test
public void testWithDefaults() {
queryCassandraTestRunner.removeProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE);
queryCassandraTestRunner.removeProperty(QueryCassandra.OUTPUT_BATCH_SIZE);
queryCassandraTestRunner.removeProperty(QueryCassandra.FETCH_SIZE);
queryCassandraTestRunner.enqueue("");
queryCassandraTestRunner.run();
Assertions.assertEquals(1, queryCassandraTestRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS).size());
queryCassandraTestRunner.clearTransferState();
}

View File

@ -163,6 +163,34 @@ public class QueryCassandraTest {
new String(files.get(0).toByteArray()));
}
@Test
public void testProcessorJsonOutputFragmentAttributes() {
processor = new MockQueryCassandraTwoRounds();
testRunner = TestRunners.newTestRunner(processor);
setUpStandardProcessorConfig();
testRunner.setIncomingConnection(false);
testRunner.setProperty(QueryCassandra.MAX_ROWS_PER_FLOW_FILE, "1");
// Test JSON output
testRunner.setProperty(QueryCassandra.OUTPUT_FORMAT, QueryCassandra.JSON_FORMAT);
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_SUCCESS, 2);
List<MockFlowFile> files = testRunner.getFlowFilesForRelationship(QueryCassandra.REL_SUCCESS);
assertNotNull(files);
assertEquals(2, files.size(), "Two files should be transferred to success");
String indexIdentifier = null;
for (int i = 0; i < files.size(); i++) {
MockFlowFile flowFile = files.get(i);
flowFile.assertAttributeEquals(QueryCassandra.FRAGMENT_INDEX, String.valueOf(i));
if (indexIdentifier == null) {
indexIdentifier = flowFile.getAttribute(QueryCassandra.FRAGMENT_ID);
} else {
flowFile.assertAttributeEquals(QueryCassandra.FRAGMENT_ID, indexIdentifier);
}
flowFile.assertAttributeEquals(QueryCassandra.FRAGMENT_COUNT, String.valueOf(files.size()));
}
}
@Test
public void testProcessorELConfigJsonOutput() {
setUpStandardProcessorConfig();
@ -412,8 +440,10 @@ public class QueryCassandraTest {
@Test
public void testConvertToAvroStream() throws Exception {
processor = new MockQueryCassandraTwoRounds();
testRunner = TestRunners.newTestRunner(processor);
setUpStandardProcessorConfig();
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
ResultSet rs = CassandraQueryTestUtil.createMockResultSet(false);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long numberOfRows = QueryCassandra.convertToAvroStream(rs, 0, baos, 0, null);
assertEquals(2, numberOfRows);