NIFI-13936 Removed GCP PubSub Lite Processors (#9455)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2024-10-28 15:36:40 +01:00 committed by GitHub
parent 63b0456bad
commit 8b6bf16929
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 0 additions and 688 deletions

View File

@ -124,10 +124,6 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-drive</artifactId>

View File

@ -1,297 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.lite;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
@SeeAlso({PublishGCPubSubLite.class})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription.")
@WritesAttributes({
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
@WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION),
@WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
})
public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
.name("gcp-pubsub-subscription")
.displayName("Subscription")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor
.Builder().name("gcp-bytes-outstanding")
.displayName("Bytes Outstanding")
.description("The number of quota bytes that may be outstanding to the client.")
.required(true)
.defaultValue("10 MB")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor
.Builder().name("gcp-messages-outstanding")
.displayName("Messages Outstanding")
.description("The number of messages that may be outstanding to the client.")
.required(true)
.defaultValue("1000")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
SUBSCRIPTION,
BYTES_OUTSTANDING,
MESSAGES_OUTSTANDING
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
private Subscriber subscriber = null;
private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
final String subscription = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
try {
SubscriptionPath.parse(subscription);
} catch (final ApiException e) {
results.add(new ValidationResult.Builder()
.subject(SUBSCRIPTION.getName())
.input(subscription)
.valid(false)
.explanation("The Suscription does not have a valid format.")
.build());
}
return results;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
if (subscriber == null) {
subscriber = getSubscriber(context);
}
} catch (final Exception e) {
getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", e);
throw new ProcessException(e);
}
try {
subscriber.startAsync().awaitRunning();
} catch (final Exception e) {
getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", subscriber.failureCause());
throw new ProcessException(e);
}
}
@OnStopped
public void onStopped() {
try {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
subscriber = null;
}
} catch (final Exception e) {
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", e);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) {
getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor...");
context.yield();
return;
}
if (!subscriber.isRunning()) {
getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", subscriber.failureCause());
throw new ProcessException(subscriber.failureCause());
}
final Message message = messages.poll();
if (message == null) {
context.yield();
return;
}
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId());
attributes.put(ORDERING_KEY_ATTRIBUTE, message.getMessage().getOrderingKey());
attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount()));
attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds()));
attributes.putAll(message.getMessage().getAttributesMap());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toStringUtf8().getBytes()));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
message.getConsumer().ack();
}
private Subscriber getSubscriber(final ProcessContext context) {
final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
final FlowControlSettings flowControlSettings = FlowControlSettings.builder()
.setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue())
.setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong())
.build();
final MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
try {
messages.put(new Message(message, consumer));
} catch (final InterruptedException e) {
getLogger().error("Could not save the message inside the internal queue of the processor", e);
}
};
final SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setSubscriptionPath(subscriptionPath)
.setReceiver(receiver)
.setPerPartitionFlowControlSettings(flowControlSettings)
.build();
return Subscriber.create(subscriberSettings);
}
private class Message {
private PubsubMessage message;
private AckReplyConsumer consumer;
public Message(final PubsubMessage message, final AckReplyConsumer consumer) {
this.message = message;
this.consumer = consumer;
}
public PubsubMessage getMessage() {
return message;
}
public AckReplyConsumer getConsumer() {
return consumer;
}
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
try {
getSubscriber(context);
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Create the Subscriber")
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully created the Google Cloud PubSub Lite Subscriber")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to create Google Cloud PubSub Lite Subscriber", e);
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Create the Subscriber")
.outcome(Outcome.FAILED)
.explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage())
.build());
}
return verificationResults;
}
@Override
protected GoogleCredentials getGoogleCredentials(final ProcessContext context) {
return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
}
}

View File

@ -1,320 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.lite;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.threeten.bp.Duration;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
@SeeAlso({ConsumeGCPubSubLite.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
@CapabilityDescription("Publishes the content of the incoming FlowFile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." +
" If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes({
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
@WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
.name("gcp-pubsub-topic")
.displayName("Topic Name")
.description("Name of the Google Cloud PubSub Topic. Example: projects/8476107443/locations/europe-west1-d/topics/my-lite-topic")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor ORDERING_KEY = new PropertyDescriptor
.Builder().name("gcp-ordering-key")
.displayName("Ordering Key")
.description("Messages with the same ordering key will always get published to the same partition. When this property is not "
+ "set, messages can get published to different partitions if more than one partition exists for the topic.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
TOPIC_NAME,
ORDERING_KEY,
BATCH_SIZE_THRESHOLD,
BATCH_BYTES_THRESHOLD,
BATCH_DELAY_THRESHOLD
);
public static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
private Publisher publisher = null;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
final String topic = validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
try {
TopicPath.parse(topic);
} catch (final ApiException e) {
results.add(new ValidationResult.Builder()
.subject(TOPIC_NAME.getName())
.input(topic)
.valid(false)
.explanation("The Topic does not have a valid format.")
.build());
}
return results;
}
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
if (publisher == null) {
publisher = getPublisher(context);
}
} catch (final Exception e) {
getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", e);
throw new ProcessException(e);
}
try {
publisher.startAsync().awaitRunning();
} catch (final Exception e) {
getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", publisher.failureCause());
throw new ProcessException(e);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int flowFileCount = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
final List<FlowFile> flowFiles = session.get(flowFileCount);
if (flowFiles.isEmpty()) {
context.yield();
return;
}
if (publisher == null) {
getLogger().error("Google Cloud PubSub Lite Publisher was not properly created. Yielding the processor...");
context.yield();
return;
}
if (!publisher.isRunning()) {
getLogger().error("Google Cloud PubSub Lite Publisher is not running. Yielding the processor...", publisher.failureCause());
throw new ProcessException(publisher.failureCause());
}
final long startNanos = System.nanoTime();
final List<FlowFile> successfulFlowFiles = new ArrayList<>();
final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
final List<ApiFuture<String>> futures = new ArrayList<>();
try {
for (FlowFile flowFile : flowFiles) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray());
final String orderingKey = context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
final PubsubMessage.Builder message = PubsubMessage.newBuilder().setData(flowFileContent)
.setPublishTime(Timestamp.newBuilder().build())
.putAllAttributes(getDynamicAttributesMap(context, flowFile));
if (orderingKey != null) {
message.setOrderingKey(orderingKey);
}
final ApiFuture<String> messageIdFuture = publisher.publish(message.build());
futures.add(messageIdFuture);
flowFile = session.putAttribute(flowFile, TOPIC_NAME_ATTRIBUTE, topicName);
}
try {
ApiFutures.allAsList(futures).get();
successfulFlowFiles.addAll(flowFiles);
} catch (InterruptedException | ExecutionException e) {
getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, "
+ "routing all messages from the batch to failure", topicName, e.getLocalizedMessage(), e);
session.transfer(flowFiles, REL_FAILURE);
context.yield();
}
} finally {
if (!successfulFlowFiles.isEmpty()) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
for (FlowFile flowFile : successfulFlowFiles) {
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
}
}
}
}
@OnStopped
public void onStopped() {
try {
if (publisher != null) {
publisher.stopAsync().awaitTerminated();
publisher = null;
}
} catch (final Exception e) {
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Publisher", e);
}
}
private Map<String, String> getDynamicAttributesMap(final ProcessContext context, final FlowFile flowFile) {
final Map<String, String> attributes = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
attributes.put(entry.getKey().getName(), value);
}
}
return attributes;
}
private Publisher getPublisher(final ProcessContext context) {
final TopicPath topicPath = TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
final PublisherSettings publisherSettings =
PublisherSettings.newBuilder()
.setTopicPath(topicPath)
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
.setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
.setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
.setIsEnabled(true)
.build())
.build();
return Publisher.create(publisherSettings);
}
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
try {
getPublisher(context);
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Create the Publisher")
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully created the Google Cloud PubSub Lite Publisher")
.build());
} catch (final Exception e) {
verificationLogger.error("Failed to create Google Cloud PubSub Lite Publisher", e);
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Create the Publisher")
.outcome(Outcome.FAILED)
.explanation("Failed to create Google Cloud PubSub Lite Publisher: " + e.getLocalizedMessage())
.build());
}
return verificationResults;
}
@Override
protected GoogleCredentials getGoogleCredentials(final ProcessContext context) {
return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
}
}

View File

@ -18,8 +18,6 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.lite;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class PublishGCPubSubLiteTest {
private TestRunner runner;
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
}
@Test
void testPropertyDescriptors() throws InitializationException {
runner.assertNotValid();
final ControllerService controllerService = new GCPCredentialsControllerService();
final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName();
runner.addControllerService(controllerServiceId, controllerService);
runner.enableControllerService(controllerService);
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE, controllerServiceId);
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.TOPIC_NAME, "projects/my-project/locations/my-location/topics/my-topic");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100 millis");
runner.assertValid();
}
}