NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until session has been committed, in order ot ensure that we don't have data loss

This closes #5102.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Payne 2021-05-26 11:34:51 -04:00 committed by Peter Turcsanyi
parent 1e6161c0aa
commit 46b1f6755c
1 changed files with 20 additions and 14 deletions

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -130,11 +131,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
} }
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) { if (subscriber == null) {
if (storedException.get() != null) { if (storedException.get() != null) {
getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()}); getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", storedException.get());
} else { } else {
getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor..."); getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
} }
@ -145,6 +145,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>(); final List<String> ackIds = new ArrayList<>();
final String subscriptionName = getSubscriptionName(context);
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
if (message.hasMessage()) { if (message.hasMessage()) {
@ -164,20 +165,26 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray())); flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context)); session.getProvenanceReporter().receive(flowFile, subscriptionName);
} }
} }
if (!ackIds.isEmpty()) { session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(ackIds)
.setSubscription(getSubscriptionName(context))
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
} }
private String getSubscriptionName(ProcessContext context) { private void acknowledgeAcks(final Collection<String> ackIds, final String subscriptionName) {
if (ackIds == null || ackIds.isEmpty()) {
return;
}
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(ackIds)
.setSubscription(subscriptionName)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
private String getSubscriptionName(final ProcessContext context) {
final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue(); final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
@ -189,8 +196,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
} }
private SubscriberStub getSubscriber(ProcessContext context) throws IOException { private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.build(); .build();