NIFI-11981 - PublishGCPubSub failure / Record-based processing / AVRO

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7638
This commit is contained in:
Paul Grey 2023-08-22 16:42:03 -04:00 committed by Matt Burgess
parent 016834efaa
commit c0c1b386f6
4 changed files with 40 additions and 20 deletions

View File

@ -209,6 +209,7 @@
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
<exclude>src/test/resources/pubsub/records.avro</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -369,20 +369,21 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
final List<Throwable> failures = new ArrayList<>();
final Map<String, String> attributes = flowFile.getAttributes();
final RecordReader reader = readerFactory.createRecordReader(
attributes, session.read(flowFile), flowFile.getSize(), getLogger());
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
try (final RecordReader reader = readerFactory.createRecordReader(
attributes, session.read(flowFile), flowFile.getSize(), getLogger())) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes);
final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes);
final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
while (pushBackRecordSet.isAnotherRecord()) {
final ApiFuture<String> apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
futures.add(apiFuture);
addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
while (pushBackRecordSet.isAnotherRecord()) {
final ApiFuture<String> apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
futures.add(apiFuture);
addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
}
flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
}
flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
}
finishBatch(session, stopWatch, flowFileResults);
}

View File

@ -24,6 +24,7 @@ import com.google.cloud.pubsub.v1.Publisher;
import io.grpc.Status;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
@ -32,6 +33,7 @@ import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControll
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -154,12 +156,29 @@ public class PublishGCPubSubTest {
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
}
@Test
void testSendOneSuccessRecordStrategyAvroReader() throws InitializationException, IOException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new AvroReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource("pubsub/records.avro"))));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
assertEquals("3", flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
}
@Test
void testSendOneSuccessRecordStrategy() throws InitializationException, IOException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@ -178,7 +197,7 @@ public class PublishGCPubSubTest {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@ -195,7 +214,7 @@ public class PublishGCPubSubTest {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@ -214,15 +233,14 @@ public class PublishGCPubSubTest {
return controllerServiceId;
}
private static String getReaderServiceId(TestRunner runner) throws InitializationException {
final ControllerService readerService = new JsonTreeReader();
final String readerServiceId = readerService.getClass().getName();
runner.addControllerService(readerServiceId, readerService);
runner.enableControllerService(readerService);
private static String getReaderServiceId(
final TestRunner runner, final RecordReaderFactory recordReaderFactory) throws InitializationException {
final String readerServiceId = recordReaderFactory.getClass().getName();
runner.addControllerService(readerServiceId, recordReaderFactory);
runner.enableControllerService(recordReaderFactory);
return readerServiceId;
}
private static String getWriterServiceId(TestRunner runner) throws InitializationException {
final ControllerService writerService = new JsonRecordSetWriter();
final String writerServiceId = writerService.getClass().getName();