From 1663a6c09428cb0301c84e547cefc7e17ff2cd20 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Sat, 12 May 2018 22:51:23 +0530 Subject: [PATCH] NIFI-5133: Implemented Google Cloud PubSub Processors Signed-off-by: Pierre Villard This closes #2724. --- .../nifi-gcp-processors/pom.xml | 4 + .../processors/gcp/AbstractGCPProcessor.java | 5 +- .../gcp/pubsub/AbstractGCPubSubProcessor.java | 65 +++++ .../gcp/pubsub/ConsumeGCPubSub.java | 201 ++++++++++++++ .../gcp/pubsub/PubSubAttributes.java | 42 +++ .../gcp/pubsub/PublishGCPubSub.java | 249 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 4 +- .../gcp/pubsub/AbstractGCPubSubIT.java | 47 ++++ .../gcp/pubsub/ConsumeGCPubSubIT.java | 102 +++++++ .../gcp/pubsub/PublishGCPubSubIT.java | 75 ++++++ 10 files changed, 791 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index eeb757b700..8469e2947a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -71,6 +71,10 @@ + + com.google.cloud + google-cloud-pubsub + com.tdunning json diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java index 0505d63745..0da6c62468 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java @@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; + import java.util.List; /** @@ -110,7 +111,7 @@ public abstract class AbstractGCPProcessor< * @return GoogleCredentials for the processor to access. * @see AuthCredentials */ - private GoogleCredentials getGoogleCredentials(final ProcessContext context) { + protected GoogleCredentials getGoogleCredentials(final ProcessContext context) { final GCPCredentialsService gcpCredentialsService = context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class); return gcpCredentialsService.getGoogleCredentials(); @@ -123,7 +124,7 @@ public abstract class AbstractGCPProcessor< @OnScheduled public void onScheduled(ProcessContext context) { final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context)); - this.cloudService = options.getService(); + this.cloudService = options != null ? options.getService() : null; } /** diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java new file mode 100644 index 0000000000..31dc0a7fd6 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.ServiceOptions; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.AbstractGCPProcessor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor { + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("gcp-pubsub-publish-batch-size") + .displayName("Batch Size") + .description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " + + "will be used in a batch") + .required(true) + .defaultValue("15") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship after a successful Google Cloud Pub/Sub operation.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails.") + .build(); + + private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { + return null; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java new file mode 100644 index 0000000000..23aaff08ed --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.common.collect.ImmutableList; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION; + +@SeeAlso({PublishGCPubSub.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"}) +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " + + "the configured number of messages will be pulled in a single request, else only one message will be pulled.") +@WritesAttributes({ + @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION), + @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION), + @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION), + @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION), + @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION) +}) +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { + + public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() + .name("gcp-pubsub-subscription") + .displayName("Subscription") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .description("Name of the Google Cloud Pub/Sub Subscription") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private SubscriberStub subscriber = null; + private PullRequest pullRequest; + + private AtomicReference storedException = new AtomicReference<>(); + + @OnScheduled + public void onScheduled(ProcessContext context) { + final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + pullRequest = PullRequest.newBuilder() + .setMaxMessages(batchSize) + .setReturnImmediately(false) + .setSubscription(getSubscriptionName(context)) + .build(); + + try { + subscriber = getSubscriber(context); + } catch (IOException e) { + storedException.set(e); + getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e}); + } + } + + @OnStopped + public void onStopped() { + if (subscriber != null) { + subscriber.shutdown(); + } + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.of(PROJECT_ID, + GCP_CREDENTIALS_PROVIDER_SERVICE, + SUBSCRIPTION, + BATCH_SIZE); + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + if (subscriber == null) { + + if (storedException.get() != null) { + getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()}); + } else { + getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor..."); + } + + context.yield(); + return; + } + + final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); + final List ackIds = new ArrayList<>(); + + for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { + if (message.hasMessage()) { + FlowFile flowFile = session.create(); + + final Map attributes = new HashMap<>(); + ackIds.add(message.getAckId()); + + attributes.put(ACK_ID_ATTRIBUTE, message.getAckId()); + attributes.put(SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(message.getSerializedSize())); + attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId()); + attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount())); + attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds())); + attributes.putAll(message.getMessage().getAttributesMap()); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray())); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context)); + } + } + + if (!ackIds.isEmpty()) { + AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() + .addAllAckIds(ackIds) + .setSubscription(getSubscriptionName(context)) + .build(); + subscriber.acknowledgeCallable().call(acknowledgeRequest); + } + } + + private String getSubscriptionName(ProcessContext context) { + final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue(); + final String projectId = context.getProperty(PROJECT_ID).getValue(); + + if (subscriptionName.contains("/")) { + return ProjectSubscriptionName.parse(subscriptionName).toString(); + } else { + return ProjectSubscriptionName.of(projectId, subscriptionName).toString(); + } + + } + + private SubscriberStub getSubscriber(ProcessContext context) throws IOException { + + final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() + .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) + .build(); + + return GrpcSubscriberStub.create(subscriberStubSettings); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java new file mode 100644 index 0000000000..6aaf04a6b0 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +public class PubSubAttributes { + + public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId"; + public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic"; + + public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic"; + public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to"; + + public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId"; + public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message"; + + public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize"; + public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message"; + + public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount"; + public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any"; + + public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime"; + public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published"; + + public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes"; + public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " + + "if the original Google Cloud Publisher client added any attributes to the message while sending"; +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java new file mode 100644 index 0000000000..79e86145f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION; + +@SeeAlso({ConsumeGCPubSub.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"}) +@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties." + + " If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.") +@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute", + description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +@WritesAttributes({ + @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), + @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION) +}) +public class PublishGCPubSub extends AbstractGCPubSubProcessor{ + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("gcp-pubsub-topic") + .displayName("Topic Name") + .description("Name of the Google Cloud PubSub Topic") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.") + .build(); + + private Publisher publisher = null; + private AtomicReference storedException = new AtomicReference<>(); + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.of(PROJECT_ID, + GCP_CREDENTIALS_PROVIDER_SERVICE, + TOPIC_NAME, + BATCH_SIZE); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + @Override + public Set getRelationships() { + return Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY)) + ); + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + try { + publisher = getPublisherBuilder(context).build(); + } catch (IOException e) { + getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e}); + storedException.set(e); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); + final List flowFiles = session.get(flowFileCount); + + if (flowFiles.isEmpty() || publisher == null) { + if (storedException.get() != null) { + getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{storedException.get()}); + } + context.yield(); + return; + } + + final long startNanos = System.nanoTime(); + final List successfulFlowFiles = new ArrayList<>(); + final String topicName = getTopicName(context).toString(); + + try { + for (FlowFile flowFile : flowFiles) { + try { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final ByteString flowFileContent = ByteString.copyFromUtf8(baos.toString()); + + PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent) + .setPublishTime(Timestamp.newBuilder().build()) + .putAllAttributes(getDynamicAttributesMap(context, flowFile)) + .build(); + + ApiFuture messageIdFuture = publisher.publish(message); + + while (messageIdFuture.isDone()) { + Thread.sleep(500L); + } + + final String messageId = messageIdFuture.get(); + final Map attributes = new HashMap<>(); + + attributes.put(MESSAGE_ID_ATTRIBUTE, messageId); + attributes.put(TOPIC_NAME_ATTRIBUTE, topicName); + + flowFile = session.putAllAttributes(flowFile, attributes); + successfulFlowFiles.add(flowFile); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof DeadlineExceededException) { + getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " + + "so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e); + session.transfer(flowFile, REL_RETRY); + } else { + getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", + new Object[]{topicName, e}); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + } + } finally { + if (!successfulFlowFiles.isEmpty()) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + for (FlowFile flowFile : successfulFlowFiles) { + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis); + } + } + } + } + + @OnStopped + public void onStopped() { + shutdownPublisher(); + } + + private void shutdownPublisher() { + try { + if (publisher != null) { + publisher.shutdown(); + } + } catch (Exception e) { + getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e}); + } + } + + private ProjectTopicName getTopicName(ProcessContext context) { + final String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); + final String projectId = context.getProperty(PROJECT_ID).getValue(); + + if (topic.contains("/")) { + return ProjectTopicName.parse(topic); + } else { + return ProjectTopicName.of(projectId, topic); + } + } + + private Map getDynamicAttributesMap(ProcessContext context, FlowFile flowFile) { + final Map attributes = new HashMap<>(); + for (final Map.Entry entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue(); + attributes.put(entry.getKey().getName(), value); + } + } + + return attributes; + } + + private Publisher.Builder getPublisherBuilder(ProcessContext context) { + final Long batchSize = context.getProperty(BATCH_SIZE).asLong(); + + return Publisher.newBuilder(getTopicName(context)) + .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) + .setBatchingSettings(BatchingSettings.newBuilder() + .setElementCountThreshold(batchSize) + .setIsEnabled(true) + .build()); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b5d5df79d0..249d19eb15 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,4 +15,6 @@ org.apache.nifi.processors.gcp.storage.PutGCSObject org.apache.nifi.processors.gcp.storage.FetchGCSObject org.apache.nifi.processors.gcp.storage.DeleteGCSObject -org.apache.nifi.processors.gcp.storage.ListGCSBucket \ No newline at end of file +org.apache.nifi.processors.gcp.storage.ListGCSBucket +org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub +org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java new file mode 100644 index 0000000000..933e6789bd --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; + +import java.util.HashMap; +import java.util.Map; + +public class AbstractGCPubSubIT { + + protected static final String PROJECT_ID = "my-gcm-client"; + protected static final String CONTROLLER_SERVICE = "GCPCredentialsService"; + protected static TestRunner runner; + + protected TestRunner setCredentialsCS(TestRunner runner) throws InitializationException { + final String serviceAccountJsonFilePath = "path/to/credentials/json"; + final Map propertiesMap = new HashMap<>(); + final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService(); + + propertiesMap.put("application-default-credentials", "false"); + propertiesMap.put("compute-engine-credentials", "false"); + propertiesMap.put("service-account-json-file", serviceAccountJsonFilePath); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap); + runner.enableControllerService(credentialsControllerService); + runner.assertValid(credentialsControllerService); + + return runner; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java new file mode 100644 index 0000000000..e36d31c529 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; + +public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{ + + @BeforeClass + public static void setup() throws InitializationException { + runner = TestRunners.newTestRunner(ConsumeGCPubSub.class); + } + + @Test + public void testSimpleConsume() throws InitializationException { + final String subscription = "my-sub"; + runner.clearTransferState(); + + runner = setCredentialsCS(runner); + + runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); + runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10"); + + runner.assertValid(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 10); + runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE); + } + + @Test + public void testConsumeWithBatchSize() throws InitializationException { + final String subscription = "my-sub"; + runner.clearTransferState(); + + runner = setCredentialsCS(runner); + + runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); + runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); + + runner.assertValid(); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2); + runner.run(); + runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 4); + + runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE); + } + + @Test + public void testConsumeWithFormattedSubscriptionName() throws InitializationException { + final String subscription = "projects/my-gcm-client/subscriptions/my-sub"; + runner.clearTransferState(); + + runner = setCredentialsCS(runner); + + runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); + runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); + + runner.assertValid(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2); + runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java new file mode 100644 index 0000000000..b774db82f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; + +public class PublishGCPubSubIT extends AbstractGCPubSubIT{ + + @BeforeClass + public static void setup() throws InitializationException { + runner = TestRunners.newTestRunner(PublishGCPubSub.class); + } + + @Test + public void testSimplePublish() throws InitializationException { + final String topic = "my-topic"; + + runner = setCredentialsCS(runner); + + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); + + runner.assertValid(); + + runner.enqueue("Testing simple publish"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE); + } + + @Test + public void testPublishWithFormattedTopicName() throws InitializationException { + final String topic = "projects/my-gcm-client/topics/my-topic"; + + runner = setCredentialsCS(runner); + + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); + + runner.assertValid(); + + runner.enqueue("Testing publish with formatted topic name"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 2); + runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE); + } +}