From c0c1b386f61737635eab3f42e3d0b97296fa09f3 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Tue, 22 Aug 2023 16:42:03 -0400 Subject: [PATCH] NIFI-11981 - PublishGCPubSub failure / Record-based processing / AVRO Signed-off-by: Matt Burgess This closes #7638 --- .../nifi-gcp-processors/pom.xml | 1 + .../gcp/pubsub/PublishGCPubSub.java | 23 +++++------ .../gcp/pubsub/PublishGCPubSubTest.java | 36 +++++++++++++----- .../src/test/resources/pubsub/records.avro | Bin 0 -> 374 bytes 4 files changed, 40 insertions(+), 20 deletions(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index 84e84417ea..13fab32080 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -209,6 +209,7 @@ src/test/resources/bigquery/schema-correct-data-with-date.avsc src/test/resources/bigquery/streaming-correct-data-with-date.json src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json + src/test/resources/pubsub/records.avro diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java index f2d5ed7ebd..d48652f0b6 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java @@ -369,20 +369,21 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { final List failures = new ArrayList<>(); final Map attributes = flowFile.getAttributes(); - final RecordReader reader = readerFactory.createRecordReader( - attributes, session.read(flowFile), flowFile.getSize(), getLogger()); - final RecordSet recordSet = reader.createRecordSet(); - final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); + try (final RecordReader reader = readerFactory.createRecordReader( + attributes, session.read(flowFile), flowFile.getSize(), getLogger())) { + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes); - final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes); + final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); - while (pushBackRecordSet.isAnotherRecord()) { - final ApiFuture apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next()); - futures.add(apiFuture); - addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor); + while (pushBackRecordSet.isAnotherRecord()) { + final ApiFuture apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next()); + futures.add(apiFuture); + addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor); + } + flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures)); } - flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures)); } finishBatch(session, stopWatch, flowFileResults); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java index 2870942834..66bd66af72 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java @@ -24,6 +24,7 @@ import com.google.cloud.pubsub.v1.Publisher; import io.grpc.Status; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.avro.AvroReader; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; @@ -32,6 +33,7 @@ import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControll import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy; import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -154,12 +156,29 @@ public class PublishGCPubSubTest { runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1); } + @Test + void testSendOneSuccessRecordStrategyAvroReader() throws InitializationException, IOException { + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new AvroReader())); + runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); + runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); + + runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull( + getClass().getClassLoader().getResource("pubsub/records.avro")))); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next(); + assertEquals("3", flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE)); + } + @Test void testSendOneSuccessRecordStrategy() throws InitializationException, IOException { runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); - runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader())); runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); @@ -178,7 +197,7 @@ public class PublishGCPubSubTest { runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); - runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader())); runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); @@ -195,7 +214,7 @@ public class PublishGCPubSubTest { runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); - runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner, new JsonTreeReader())); runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); @@ -214,15 +233,14 @@ public class PublishGCPubSubTest { return controllerServiceId; } - private static String getReaderServiceId(TestRunner runner) throws InitializationException { - final ControllerService readerService = new JsonTreeReader(); - final String readerServiceId = readerService.getClass().getName(); - runner.addControllerService(readerServiceId, readerService); - runner.enableControllerService(readerService); + private static String getReaderServiceId( + final TestRunner runner, final RecordReaderFactory recordReaderFactory) throws InitializationException { + final String readerServiceId = recordReaderFactory.getClass().getName(); + runner.addControllerService(readerServiceId, recordReaderFactory); + runner.enableControllerService(recordReaderFactory); return readerServiceId; } - private static String getWriterServiceId(TestRunner runner) throws InitializationException { final ControllerService writerService = new JsonRecordSetWriter(); final String writerServiceId = writerService.getClass().getName(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro new file mode 100644 index 0000000000000000000000000000000000000000..3456b31d4b5f3b01d730e3b4eb39852a6c5d19bb GIT binary patch literal 374 zcmeZI%3@>@ODrqO*DFrWNX<>0#$2sbQdy9yWTjM;nw(#hqNJmgmzWFU=Vhj41|f?T z7bGTwB=U>W^%8;Xj8r|48laA}%+#EeVkN8SYPe>Q89Gp-qm_zFiZb)kL1vfccAByrlb_578etxwkop#kGj}e5wORS^HWljIY1UMR6fg1yZ!v%d7;B4+LtD+H)cD* z$iU^8oSSOMpk!!dY@*