From 46b1f6755c5ca3cc4bdb55e48fd09e1216b66d71 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 26 May 2021 11:34:51 -0400 Subject: [PATCH] NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until session has been committed, in order ot ensure that we don't have data loss This closes #5102. Signed-off-by: Peter Turcsanyi --- .../gcp/pubsub/ConsumeGCPubSub.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java index 5693721be8..70b9e26ec7 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -130,11 +131,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { if (subscriber == null) { - if (storedException.get() != null) { - getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()}); + getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", storedException.get()); } else { getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor..."); } @@ -145,6 +145,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); final List ackIds = new ArrayList<>(); + final String subscriptionName = getSubscriptionName(context); for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { if (message.hasMessage()) { @@ -164,20 +165,26 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray())); session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context)); + session.getProvenanceReporter().receive(flowFile, subscriptionName); } } - if (!ackIds.isEmpty()) { - AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() - .addAllAckIds(ackIds) - .setSubscription(getSubscriptionName(context)) - .build(); - subscriber.acknowledgeCallable().call(acknowledgeRequest); - } + session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName)); } - private String getSubscriptionName(ProcessContext context) { + private void acknowledgeAcks(final Collection ackIds, final String subscriptionName) { + if (ackIds == null || ackIds.isEmpty()) { + return; + } + + AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() + .addAllAckIds(ackIds) + .setSubscription(subscriptionName) + .build(); + subscriber.acknowledgeCallable().call(acknowledgeRequest); + } + + private String getSubscriptionName(final ProcessContext context) { final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue(); final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); @@ -189,8 +196,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { } - private SubscriberStub getSubscriber(ProcessContext context) throws IOException { - + private SubscriberStub getSubscriber(final ProcessContext context) throws IOException { final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .build();