mirror of https://github.com/apache/nifi.git
NIFI-9304 - Adding PublishGCPubSubLite and ConsumeGCPubSubLite processors
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #5460.
This commit is contained in:
parent
c96809012b
commit
37c0527a72
|
@ -64,12 +64,12 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<version>1.16.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.cloud</groupId>
|
||||
<artifactId>google-cloud-core</artifactId>
|
||||
<version>2.1.7</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
|
@ -89,6 +89,11 @@
|
|||
<groupId>com.google.cloud</groupId>
|
||||
<artifactId>google-cloud-pubsub</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.cloud</groupId>
|
||||
<artifactId>google-cloud-pubsublite</artifactId>
|
||||
<version>1.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.tdunning</groupId>
|
||||
<artifactId>json</artifactId>
|
||||
|
@ -109,7 +114,6 @@
|
|||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
@ -152,4 +156,4 @@
|
|||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
</project>
|
||||
|
|
|
@ -1,42 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.gcp.pubsub;
|
||||
|
||||
public class PubSubAttributes {
|
||||
|
||||
public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
|
||||
public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
|
||||
|
||||
public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
|
||||
public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";
|
||||
|
||||
public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
|
||||
public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message";
|
||||
|
||||
public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize";
|
||||
public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message";
|
||||
|
||||
public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount";
|
||||
public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any";
|
||||
|
||||
public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime";
|
||||
public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published";
|
||||
|
||||
public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes";
|
||||
public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " +
|
||||
"if the original Google Cloud Publisher client added any attributes to the message while sending";
|
||||
}
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.gcp.pubsub;
|
||||
|
||||
public class PubSubAttributes {
|
||||
|
||||
public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
|
||||
public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
|
||||
|
||||
public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
|
||||
public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";
|
||||
|
||||
public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
|
||||
public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message";
|
||||
|
||||
public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize";
|
||||
public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message";
|
||||
|
||||
public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount";
|
||||
public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any";
|
||||
|
||||
public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime";
|
||||
public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published";
|
||||
|
||||
public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes";
|
||||
public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " +
|
||||
"if the original Google Cloud Publisher client added any attributes to the message while sending";
|
||||
|
||||
public static final String ORDERING_KEY_ATTRIBUTE = "gcp.pubsub.ordering.key";
|
||||
public static final String ORDERING_KEY_DESCRIPTION = "If non-empty, identifies related messages for which publish order should be"
|
||||
+ " respected. If a 'Subscription' has 'enable_message_ordering' set to 'true',"
|
||||
+ " messages published with the same non-empty 'ordering_key' value will be"
|
||||
+ " delivered to subscribers in the order in which they are received by the"
|
||||
+ " Pub/Sub system. All 'PubsubMessage's published in a given 'PublishRequest'"
|
||||
+ " must specify the same 'ordering_key' value.";
|
||||
}
|
||||
|
|
|
@ -0,0 +1,289 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.gcp.pubsub.lite;
|
||||
|
||||
import com.google.api.gax.core.FixedCredentialsProvider;
|
||||
import com.google.api.gax.rpc.ApiException;
|
||||
import com.google.cloud.pubsub.v1.AckReplyConsumer;
|
||||
import com.google.cloud.pubsub.v1.MessageReceiver;
|
||||
import com.google.cloud.pubsublite.SubscriptionPath;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.pubsub.v1.PubsubMessage;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.VerifiableProcessor;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
|
||||
|
||||
@SeeAlso({PublishGCPubSubLite.class})
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
|
||||
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor "
|
||||
+ "will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with "
|
||||
+ "'Use Application Default Credentials' or 'Use Compute Engine Credentials'.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
|
||||
@WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION),
|
||||
@WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
|
||||
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
|
||||
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
|
||||
})
|
||||
public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
|
||||
|
||||
public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
|
||||
.name("gcp-pubsub-subscription")
|
||||
.displayName("Subscription")
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor
|
||||
.Builder().name("gcp-bytes-outstanding")
|
||||
.displayName("Bytes Outstanding")
|
||||
.description("The number of quota bytes that may be outstanding to the client.")
|
||||
.required(true)
|
||||
.defaultValue("10 MB")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor
|
||||
.Builder().name("gcp-messages-outstanding")
|
||||
.displayName("Messages Outstanding")
|
||||
.description("The number of messages that may be outstanding to the client.")
|
||||
.required(true)
|
||||
.defaultValue("1000")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private Subscriber subscriber = null;
|
||||
private static final BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
|
||||
final String subscription = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
|
||||
|
||||
try {
|
||||
SubscriptionPath.parse(subscription);
|
||||
} catch (final ApiException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(SUBSCRIPTION.getName())
|
||||
.input(subscription)
|
||||
.valid(false)
|
||||
.explanation("The Suscription does not have a valid format.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
try {
|
||||
if (subscriber == null) {
|
||||
subscriber = getSubscriber(context);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
try {
|
||||
subscriber.startAsync().awaitRunning();
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", subscriber.failureCause());
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
try {
|
||||
if (subscriber != null) {
|
||||
subscriber.stopAsync().awaitTerminated();
|
||||
subscriber = null;
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(SUBSCRIPTION,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
BYTES_OUTSTANDING,
|
||||
MESSAGES_OUTSTANDING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return Collections.singleton(REL_SUCCESS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
if (subscriber == null) {
|
||||
getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor...");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!subscriber.isRunning()) {
|
||||
getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", subscriber.failureCause());
|
||||
throw new ProcessException(subscriber.failureCause());
|
||||
}
|
||||
|
||||
final Message message = messages.poll();
|
||||
if (message == null) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId());
|
||||
attributes.put(ORDERING_KEY_ATTRIBUTE, message.getMessage().getOrderingKey());
|
||||
attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount()));
|
||||
attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds()));
|
||||
attributes.putAll(message.getMessage().getAttributesMap());
|
||||
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toStringUtf8().getBytes()));
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
|
||||
|
||||
message.getConsumer().ack();
|
||||
}
|
||||
|
||||
private Subscriber getSubscriber(final ProcessContext context) throws IOException {
|
||||
|
||||
final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
|
||||
|
||||
final FlowControlSettings flowControlSettings = FlowControlSettings.builder()
|
||||
.setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue())
|
||||
.setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong())
|
||||
.build();
|
||||
|
||||
final MessageReceiver receiver =
|
||||
(PubsubMessage message, AckReplyConsumer consumer) -> {
|
||||
try {
|
||||
messages.put(new Message(message, consumer));
|
||||
} catch (final InterruptedException e) {
|
||||
getLogger().error("Could not save the message inside the internal queue of the processor", e);
|
||||
}
|
||||
};
|
||||
|
||||
final SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder()
|
||||
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
|
||||
.setSubscriptionPath(subscriptionPath)
|
||||
.setReceiver(receiver)
|
||||
.setPerPartitionFlowControlSettings(flowControlSettings)
|
||||
.build();
|
||||
|
||||
return Subscriber.create(subscriberSettings);
|
||||
}
|
||||
|
||||
private class Message {
|
||||
private PubsubMessage message;
|
||||
private AckReplyConsumer consumer;
|
||||
|
||||
public Message(final PubsubMessage message, final AckReplyConsumer consumer) {
|
||||
this.message = message;
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public PubsubMessage getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public AckReplyConsumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
|
||||
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
|
||||
try {
|
||||
getSubscriber(context);
|
||||
verificationResults.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Create the Subscriber")
|
||||
.outcome(Outcome.SUCCESSFUL)
|
||||
.explanation("Successfully created the Google Cloud PubSub Lite Subscriber")
|
||||
.build());
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to create Google Cloud PubSub Lite Subscriber", e);
|
||||
|
||||
verificationResults.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Create the Subscriber")
|
||||
.outcome(Outcome.FAILED)
|
||||
.explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage())
|
||||
.build());
|
||||
}
|
||||
return verificationResults;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,325 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.gcp.pubsub.lite;
|
||||
|
||||
import com.google.api.core.ApiFuture;
|
||||
import com.google.api.core.ApiFutures;
|
||||
import com.google.api.gax.batching.BatchingSettings;
|
||||
import com.google.api.gax.core.FixedCredentialsProvider;
|
||||
import com.google.api.gax.rpc.ApiException;
|
||||
import com.google.cloud.pubsublite.TopicPath;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Timestamp;
|
||||
import com.google.pubsub.v1.PubsubMessage;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.VerifiableProcessor;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
|
||||
import org.threeten.bp.Duration;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
|
||||
|
||||
@SeeAlso({ConsumeGCPubSubLite.class})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
|
||||
@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." +
|
||||
" If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will " +
|
||||
"only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default " +
|
||||
"Credentials' or 'Use Compute Engine Credentials'.")
|
||||
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
|
||||
description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
|
||||
@WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
|
||||
})
|
||||
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
|
||||
+ "will be read into memory to be sent as a PubSub message.")
|
||||
public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
|
||||
|
||||
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
|
||||
.name("gcp-pubsub-topic")
|
||||
.displayName("Topic Name")
|
||||
.description("Name of the Google Cloud PubSub Topic. Example: projects/8476107443/locations/europe-west1-d/topics/my-lite-topic")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ORDERING_KEY = new PropertyDescriptor
|
||||
.Builder().name("gcp-ordering-key")
|
||||
.displayName("Ordering Key")
|
||||
.description("Messages with the same ordering key will always get published to the same partition. When this property is not "
|
||||
+ "set, messages can get published to different partitions if more than one partition exists for the topic.")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor
|
||||
.Builder().name("gcp-batch-bytes")
|
||||
.displayName("Batch Bytes Threshold")
|
||||
.description("Publish request gets triggered based on this Batch Bytes Threshold property and"
|
||||
+ " the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.")
|
||||
.required(true)
|
||||
.defaultValue("3 MB")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private Publisher publisher = null;
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(TOPIC_NAME,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
ORDERING_KEY,
|
||||
BATCH_SIZE,
|
||||
BATCH_BYTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.required(false)
|
||||
.name(propertyDescriptorName)
|
||||
.displayName(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
|
||||
.dynamic(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
|
||||
final String topic = validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
|
||||
|
||||
try {
|
||||
TopicPath.parse(topic);
|
||||
} catch (final ApiException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(TOPIC_NAME.getName())
|
||||
.input(topic)
|
||||
.valid(false)
|
||||
.explanation("The Topic does not have a valid format.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
try {
|
||||
if (publisher == null) {
|
||||
publisher = getPublisher(context);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
try {
|
||||
publisher.startAsync().awaitRunning();
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", publisher.failureCause());
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
|
||||
final List<FlowFile> flowFiles = session.get(flowFileCount);
|
||||
|
||||
if (flowFiles.isEmpty()) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
if (publisher == null) {
|
||||
getLogger().error("Google Cloud PubSub Lite Publisher was not properly created. Yielding the processor...");
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
if(!publisher.isRunning()) {
|
||||
getLogger().error("Google Cloud PubSub Lite Publisher is not running. Yielding the processor...", publisher.failureCause());
|
||||
throw new ProcessException(publisher.failureCause());
|
||||
}
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
final List<FlowFile> successfulFlowFiles = new ArrayList<>();
|
||||
final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
|
||||
final List<ApiFuture<String>> futures = new ArrayList<>();
|
||||
|
||||
try {
|
||||
for (FlowFile flowFile : flowFiles) {
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
session.exportTo(flowFile, baos);
|
||||
final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray());
|
||||
final String orderingKey = context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
final PubsubMessage.Builder message = PubsubMessage.newBuilder().setData(flowFileContent)
|
||||
.setPublishTime(Timestamp.newBuilder().build())
|
||||
.putAllAttributes(getDynamicAttributesMap(context, flowFile));
|
||||
|
||||
if (orderingKey != null) {
|
||||
message.setOrderingKey(orderingKey);
|
||||
}
|
||||
|
||||
final ApiFuture<String> messageIdFuture = publisher.publish(message.build());
|
||||
futures.add(messageIdFuture);
|
||||
|
||||
flowFile = session.putAttribute(flowFile, TOPIC_NAME_ATTRIBUTE, topicName);
|
||||
}
|
||||
|
||||
try {
|
||||
ApiFutures.allAsList(futures).get();
|
||||
successfulFlowFiles.addAll(flowFiles);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, "
|
||||
+ "routing all messages from the batch to failure", new Object[]{topicName, e.getLocalizedMessage()}, e);
|
||||
session.transfer(flowFiles, REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
} finally {
|
||||
if (!successfulFlowFiles.isEmpty()) {
|
||||
session.transfer(successfulFlowFiles, REL_SUCCESS);
|
||||
for (FlowFile flowFile : successfulFlowFiles) {
|
||||
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
try {
|
||||
if (publisher != null) {
|
||||
publisher.stopAsync().awaitTerminated();
|
||||
publisher = null;
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Publisher", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> getDynamicAttributesMap(final ProcessContext context, final FlowFile flowFile) {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||
if (entry.getKey().isDynamic()) {
|
||||
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
|
||||
attributes.put(entry.getKey().getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
private Publisher getPublisher(final ProcessContext context) {
|
||||
final TopicPath topicPath = TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
|
||||
|
||||
final PublisherSettings publisherSettings =
|
||||
PublisherSettings.newBuilder()
|
||||
.setTopicPath(topicPath)
|
||||
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
|
||||
.setBatchingSettings(BatchingSettings.newBuilder()
|
||||
.setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong())
|
||||
.setDelayThreshold(Duration.ofMillis(100))
|
||||
.setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue())
|
||||
.setIsEnabled(true)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
return Publisher.create(publisherSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
|
||||
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
|
||||
try {
|
||||
getPublisher(context);
|
||||
verificationResults.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Create the Publisher")
|
||||
.outcome(Outcome.SUCCESSFUL)
|
||||
.explanation("Successfully created the Google Cloud PubSub Lite Publisher")
|
||||
.build());
|
||||
} catch (final Exception e) {
|
||||
verificationLogger.error("Failed to create Google Cloud PubSub Lite Publisher", e);
|
||||
|
||||
verificationResults.add(new ConfigVerificationResult.Builder()
|
||||
.verificationStepName("Create the Publisher")
|
||||
.outcome(Outcome.FAILED)
|
||||
.explanation("Failed to create Google Cloud PubSub Lite Publisher: " + e.getLocalizedMessage())
|
||||
.build());
|
||||
}
|
||||
return verificationResults;
|
||||
}
|
||||
}
|
|
@ -18,5 +18,7 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
|
|||
org.apache.nifi.processors.gcp.storage.ListGCSBucket
|
||||
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
|
||||
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
|
||||
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
|
||||
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
|
||||
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
|
||||
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
|
|
@ -33,6 +33,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.auth</groupId>
|
||||
<artifactId>google-auth-library-oauth2-http</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<packaging>pom</packaging>
|
||||
|
||||
<properties>
|
||||
<google.cloud.sdk.version>0.125.0</google.cloud.sdk.version>
|
||||
<google.cloud.sdk.version>0.162.0</google.cloud.sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
|
|
Loading…
Reference in New Issue