diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index e93e210d91..d1ea44de44 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -64,12 +64,12 @@ org.apache.nifi nifi-mock - 1.16.0-SNAPSHOT test com.google.cloud google-cloud-core + 2.1.7 com.google.code.findbugs @@ -89,6 +89,11 @@ com.google.cloud google-cloud-pubsub + + com.google.cloud + google-cloud-pubsublite + 1.3.0 + com.tdunning json @@ -109,7 +114,6 @@ com.fasterxml.jackson.core jackson-core - ${jackson.version} test @@ -152,4 +156,4 @@ - \ No newline at end of file + diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java index 6aaf04a6b0..cfcdda5649 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java @@ -1,42 +1,50 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.gcp.pubsub; - -public class PubSubAttributes { - - public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId"; - public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic"; - - public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic"; - public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to"; - - public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId"; - public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message"; - - public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize"; - public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message"; - - public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount"; - public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any"; - - public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime"; - public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published"; - - public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes"; - public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " + - "if the original Google Cloud Publisher client added any attributes to the message while sending"; -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +public class PubSubAttributes { + + public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId"; + public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic"; + + public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic"; + public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to"; + + public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId"; + public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message"; + + public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize"; + public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message"; + + public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount"; + public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any"; + + public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime"; + public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published"; + + public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes"; + public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " + + "if the original Google Cloud Publisher client added any attributes to the message while sending"; + + public static final String ORDERING_KEY_ATTRIBUTE = "gcp.pubsub.ordering.key"; + public static final String ORDERING_KEY_DESCRIPTION = "If non-empty, identifies related messages for which publish order should be" + + " respected. If a 'Subscription' has 'enable_message_ordering' set to 'true'," + + " messages published with the same non-empty 'ordering_key' value will be" + + " delivered to subscribers in the order in which they are received by the" + + " Pub/Sub system. All 'PubsubMessage's published in a given 'PublishRequest'" + + " must specify the same 'ordering_key' value."; +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java new file mode 100644 index 0000000000..be94fe5168 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.lite; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; +import com.google.cloud.pubsublite.cloudpubsub.Subscriber; +import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings; +import com.google.common.collect.ImmutableList; +import com.google.pubsub.v1.PubsubMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION; + +@SeeAlso({PublishGCPubSubLite.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"}) +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor " + + "will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with " + + "'Use Application Default Credentials' or 'Use Compute Engine Credentials'.") +@WritesAttributes({ + @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), + @WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION), + @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION), + @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION), + @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION) +}) +public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor { + + public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() + .name("gcp-pubsub-subscription") + .displayName("Subscription") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor + .Builder().name("gcp-bytes-outstanding") + .displayName("Bytes Outstanding") + .description("The number of quota bytes that may be outstanding to the client.") + .required(true) + .defaultValue("10 MB") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor + .Builder().name("gcp-messages-outstanding") + .displayName("Messages Outstanding") + .description("The number of messages that may be outstanding to the client.") + .required(true) + .defaultValue("1000") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private Subscriber subscriber = null; + private static final BlockingQueue messages = new LinkedBlockingQueue<>(); + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final Collection results = new ArrayList(1); + final String subscription = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue(); + + try { + SubscriptionPath.parse(subscription); + } catch (final ApiException e) { + results.add(new ValidationResult.Builder() + .subject(SUBSCRIPTION.getName()) + .input(subscription) + .valid(false) + .explanation("The Suscription does not have a valid format.") + .build()); + } + + return results; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + if (subscriber == null) { + subscriber = getSubscriber(context); + } + } catch (final Exception e) { + getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", e); + throw new ProcessException(e); + } + try { + subscriber.startAsync().awaitRunning(); + } catch (final Exception e) { + getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", subscriber.failureCause()); + throw new ProcessException(e); + } + } + + @OnStopped + public void onStopped() { + try { + if (subscriber != null) { + subscriber.stopAsync().awaitTerminated(); + subscriber = null; + } + } catch (final Exception e) { + getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", e); + } + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.of(SUBSCRIPTION, + GCP_CREDENTIALS_PROVIDER_SERVICE, + BYTES_OUTSTANDING, + MESSAGES_OUTSTANDING); + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (subscriber == null) { + getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor..."); + context.yield(); + return; + } + + if (!subscriber.isRunning()) { + getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", subscriber.failureCause()); + throw new ProcessException(subscriber.failureCause()); + } + + final Message message = messages.poll(); + if (message == null) { + context.yield(); + return; + } + + FlowFile flowFile = session.create(); + + final Map attributes = new HashMap<>(); + attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId()); + attributes.put(ORDERING_KEY_ATTRIBUTE, message.getMessage().getOrderingKey()); + attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount())); + attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds())); + attributes.putAll(message.getMessage().getAttributesMap()); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toStringUtf8().getBytes())); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue()); + + message.getConsumer().ack(); + } + + private Subscriber getSubscriber(final ProcessContext context) throws IOException { + + final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue()); + + final FlowControlSettings flowControlSettings = FlowControlSettings.builder() + .setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()) + .setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong()) + .build(); + + final MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + try { + messages.put(new Message(message, consumer)); + } catch (final InterruptedException e) { + getLogger().error("Could not save the message inside the internal queue of the processor", e); + } + }; + + final SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder() + .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) + .setSubscriptionPath(subscriptionPath) + .setReceiver(receiver) + .setPerPartitionFlowControlSettings(flowControlSettings) + .build(); + + return Subscriber.create(subscriberSettings); + } + + private class Message { + private PubsubMessage message; + private AckReplyConsumer consumer; + + public Message(final PubsubMessage message, final AckReplyConsumer consumer) { + this.message = message; + this.consumer = consumer; + } + + public PubsubMessage getMessage() { + return message; + } + + public AckReplyConsumer getConsumer() { + return consumer; + } + } + + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) { + final List verificationResults = new ArrayList<>(); + try { + getSubscriber(context); + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Create the Subscriber") + .outcome(Outcome.SUCCESSFUL) + .explanation("Successfully created the Google Cloud PubSub Lite Subscriber") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to create Google Cloud PubSub Lite Subscriber", e); + + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Create the Subscriber") + .outcome(Outcome.FAILED) + .explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage()) + .build()); + } + return verificationResults; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java new file mode 100644 index 0000000000..7e97b3bf08 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.lite; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.cloudpubsub.Publisher; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.PubsubMessage; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor; +import org.threeten.bp.Duration; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION; + +@SeeAlso({ConsumeGCPubSubLite.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"}) +@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." + + " If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will " + + "only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default " + + "Credentials' or 'Use Compute Engine Credentials'.") +@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute", + description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +@WritesAttributes({ + @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), + @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION) +}) +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content " + + "will be read into memory to be sent as a PubSub message.") +public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor { + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("gcp-pubsub-topic") + .displayName("Topic Name") + .description("Name of the Google Cloud PubSub Topic. Example: projects/8476107443/locations/europe-west1-d/topics/my-lite-topic") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor ORDERING_KEY = new PropertyDescriptor + .Builder().name("gcp-ordering-key") + .displayName("Ordering Key") + .description("Messages with the same ordering key will always get published to the same partition. When this property is not " + + "set, messages can get published to different partitions if more than one partition exists for the topic.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor + .Builder().name("gcp-batch-bytes") + .displayName("Batch Bytes Threshold") + .description("Publish request gets triggered based on this Batch Bytes Threshold property and" + + " the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.") + .required(true) + .defaultValue("3 MB") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + private Publisher publisher = null; + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.of(TOPIC_NAME, + GCP_CREDENTIALS_PROVIDER_SERVICE, + ORDERING_KEY, + BATCH_SIZE, + BATCH_BYTES); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + @Override + public Set getRelationships() { + return Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)) + ); + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final Collection results = new ArrayList(1); + final String topic = validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); + + try { + TopicPath.parse(topic); + } catch (final ApiException e) { + results.add(new ValidationResult.Builder() + .subject(TOPIC_NAME.getName()) + .input(topic) + .valid(false) + .explanation("The Topic does not have a valid format.") + .build()); + } + + return results; + } + + @Override + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + if (publisher == null) { + publisher = getPublisher(context); + } + } catch (final Exception e) { + getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", e); + throw new ProcessException(e); + } + try { + publisher.startAsync().awaitRunning(); + } catch (final Exception e) { + getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", publisher.failureCause()); + throw new ProcessException(e); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); + final List flowFiles = session.get(flowFileCount); + + if (flowFiles.isEmpty()) { + context.yield(); + return; + } + + if (publisher == null) { + getLogger().error("Google Cloud PubSub Lite Publisher was not properly created. Yielding the processor..."); + context.yield(); + return; + } + + if(!publisher.isRunning()) { + getLogger().error("Google Cloud PubSub Lite Publisher is not running. Yielding the processor...", publisher.failureCause()); + throw new ProcessException(publisher.failureCause()); + } + + final long startNanos = System.nanoTime(); + final List successfulFlowFiles = new ArrayList<>(); + final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); + final List> futures = new ArrayList<>(); + + try { + for (FlowFile flowFile : flowFiles) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray()); + final String orderingKey = context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final PubsubMessage.Builder message = PubsubMessage.newBuilder().setData(flowFileContent) + .setPublishTime(Timestamp.newBuilder().build()) + .putAllAttributes(getDynamicAttributesMap(context, flowFile)); + + if (orderingKey != null) { + message.setOrderingKey(orderingKey); + } + + final ApiFuture messageIdFuture = publisher.publish(message.build()); + futures.add(messageIdFuture); + + flowFile = session.putAttribute(flowFile, TOPIC_NAME_ATTRIBUTE, topicName); + } + + try { + ApiFutures.allAsList(futures).get(); + successfulFlowFiles.addAll(flowFiles); + } catch (InterruptedException | ExecutionException e) { + getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, " + + "routing all messages from the batch to failure", new Object[]{topicName, e.getLocalizedMessage()}, e); + session.transfer(flowFiles, REL_FAILURE); + context.yield(); + } + } finally { + if (!successfulFlowFiles.isEmpty()) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + for (FlowFile flowFile : successfulFlowFiles) { + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis); + } + } + } + } + + @OnStopped + public void onStopped() { + try { + if (publisher != null) { + publisher.stopAsync().awaitTerminated(); + publisher = null; + } + } catch (final Exception e) { + getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Publisher", e); + } + } + + private Map getDynamicAttributesMap(final ProcessContext context, final FlowFile flowFile) { + final Map attributes = new HashMap<>(); + for (final Map.Entry entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue(); + attributes.put(entry.getKey().getName(), value); + } + } + + return attributes; + } + + private Publisher getPublisher(final ProcessContext context) { + final TopicPath topicPath = TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue()); + + final PublisherSettings publisherSettings = + PublisherSettings.newBuilder() + .setTopicPath(topicPath) + .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) + .setBatchingSettings(BatchingSettings.newBuilder() + .setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong()) + .setDelayThreshold(Duration.ofMillis(100)) + .setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue()) + .setIsEnabled(true) + .build()) + .build(); + + return Publisher.create(publisherSettings); + } + + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes) { + final List verificationResults = new ArrayList<>(); + try { + getPublisher(context); + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Create the Publisher") + .outcome(Outcome.SUCCESSFUL) + .explanation("Successfully created the Google Cloud PubSub Lite Publisher") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to create Google Cloud PubSub Lite Publisher", e); + + verificationResults.add(new ConfigVerificationResult.Builder() + .verificationStepName("Create the Publisher") + .outcome(Outcome.FAILED) + .explanation("Failed to create Google Cloud PubSub Lite Publisher: " + e.getLocalizedMessage()) + .build()); + } + return verificationResults; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9d26958e9b..dc4c4402f9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,5 +18,7 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject org.apache.nifi.processors.gcp.storage.ListGCSBucket org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub +org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite +org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 66352f7268..1f99faf645 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,6 +33,7 @@ com.google.auth google-auth-library-oauth2-http + 1.2.1 com.google.code.findbugs diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml index b6289b98fb..e7cd1c1d37 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -27,7 +27,7 @@ pom - 0.125.0 + 0.162.0