NIFI-5133: Implemented Google Cloud PubSub Processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2724.
This commit is contained in:
zenfenan 2018-05-12 22:51:23 +05:30 committed by Pierre Villard
parent 9aa7e65f70
commit 1663a6c094
10 changed files with 791 additions and 3 deletions

View File

@ -71,6 +71,10 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.tdunning</groupId> <groupId>com.tdunning</groupId>
<artifactId>json</artifactId> <artifactId>json</artifactId>

View File

@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import java.util.List; import java.util.List;
/** /**
@ -110,7 +111,7 @@ public abstract class AbstractGCPProcessor<
* @return GoogleCredentials for the processor to access. * @return GoogleCredentials for the processor to access.
* @see <a href="https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/com/google/api/client/googleapis/auth/oauth2/GoogleCredential">AuthCredentials</a> * @see <a href="https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/com/google/api/client/googleapis/auth/oauth2/GoogleCredential">AuthCredentials</a>
*/ */
private GoogleCredentials getGoogleCredentials(final ProcessContext context) { protected GoogleCredentials getGoogleCredentials(final ProcessContext context) {
final GCPCredentialsService gcpCredentialsService = final GCPCredentialsService gcpCredentialsService =
context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class); context.getProperty(GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
return gcpCredentialsService.getGoogleCredentials(); return gcpCredentialsService.getGoogleCredentials();
@ -123,7 +124,7 @@ public abstract class AbstractGCPProcessor<
@OnScheduled @OnScheduled
public void onScheduled(ProcessContext context) { public void onScheduled(ProcessContext context) {
final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context)); final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context));
this.cloudService = options.getService(); this.cloudService = options != null ? options.getService() : null;
} }
/** /**

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("gcp-pubsub-publish-batch-size")
.displayName("Batch Size")
.description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " +
"will be used in a batch")
.required(true)
.defaultValue("15")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles are routed to this relationship after a successful Google Cloud Pub/Sub operation.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails.")
.build();
private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
return null;
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
@SeeAlso({PublishGCPubSub.class})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
"the configured number of messages will be pulled in a single request, else only one message will be pulled.")
@WritesAttributes({
@WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
@WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
@WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
})
public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
.name("gcp-pubsub-subscription")
.displayName("Subscription")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.description("Name of the Google Cloud Pub/Sub Subscription")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private SubscriberStub subscriber = null;
private PullRequest pullRequest;
private AtomicReference<Exception> storedException = new AtomicReference<>();
@OnScheduled
public void onScheduled(ProcessContext context) {
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
.setReturnImmediately(false)
.setSubscription(getSubscriptionName(context))
.build();
try {
subscriber = getSubscriber(context);
} catch (IOException e) {
storedException.set(e);
getLogger().error("Failed to create Google Cloud Subscriber due to {}", new Object[]{e});
}
}
@OnStopped
public void onStopped() {
if (subscriber != null) {
subscriber.shutdown();
}
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.of(PROJECT_ID,
GCP_CREDENTIALS_PROVIDER_SERVICE,
SUBSCRIPTION,
BATCH_SIZE);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
if (subscriber == null) {
if (storedException.get() != null) {
getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()});
} else {
getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
}
context.yield();
return;
}
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
if (message.hasMessage()) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
ackIds.add(message.getAckId());
attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
attributes.put(SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(message.getSerializedSize()));
attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId());
attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount()));
attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds()));
attributes.putAll(message.getMessage().getAttributesMap());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context));
}
}
if (!ackIds.isEmpty()) {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.addAllAckIds(ackIds)
.setSubscription(getSubscriptionName(context))
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
}
private String getSubscriptionName(ProcessContext context) {
final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
final String projectId = context.getProperty(PROJECT_ID).getValue();
if (subscriptionName.contains("/")) {
return ProjectSubscriptionName.parse(subscriptionName).toString();
} else {
return ProjectSubscriptionName.of(projectId, subscriptionName).toString();
}
}
private SubscriberStub getSubscriber(ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.build();
return GrpcSubscriberStub.create(subscriberStubSettings);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
public class PubSubAttributes {
public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";
public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message";
public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize";
public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message";
public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount";
public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any";
public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime";
public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published";
public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes";
public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " +
"if the original Google Cloud Publisher client added any attributes to the message while sending";
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
@SeeAlso({ConsumeGCPubSub.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties." +
" If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes({
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
@WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
})
public class PublishGCPubSub extends AbstractGCPubSubProcessor{
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
.name("gcp-pubsub-topic")
.displayName("Topic Name")
.description("Name of the Google Cloud PubSub Topic")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.")
.build();
private Publisher publisher = null;
private AtomicReference<Exception> storedException = new AtomicReference<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.of(PROJECT_ID,
GCP_CREDENTIALS_PROVIDER_SERVICE,
TOPIC_NAME,
BATCH_SIZE);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@Override
public Set<Relationship> getRelationships() {
return Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY))
);
}
@OnScheduled
public void onScheduled(ProcessContext context) {
try {
publisher = getPublisherBuilder(context).build();
} catch (IOException e) {
getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e});
storedException.set(e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
final List<FlowFile> flowFiles = session.get(flowFileCount);
if (flowFiles.isEmpty() || publisher == null) {
if (storedException.get() != null) {
getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{storedException.get()});
}
context.yield();
return;
}
final long startNanos = System.nanoTime();
final List<FlowFile> successfulFlowFiles = new ArrayList<>();
final String topicName = getTopicName(context).toString();
try {
for (FlowFile flowFile : flowFiles) {
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final ByteString flowFileContent = ByteString.copyFromUtf8(baos.toString());
PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent)
.setPublishTime(Timestamp.newBuilder().build())
.putAllAttributes(getDynamicAttributesMap(context, flowFile))
.build();
ApiFuture<String> messageIdFuture = publisher.publish(message);
while (messageIdFuture.isDone()) {
Thread.sleep(500L);
}
final String messageId = messageIdFuture.get();
final Map<String, String> attributes = new HashMap<>();
attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
flowFile = session.putAllAttributes(flowFile, attributes);
successfulFlowFiles.add(flowFile);
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof DeadlineExceededException) {
getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
"so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_RETRY);
} else {
getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}",
new Object[]{topicName, e});
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
}
} finally {
if (!successfulFlowFiles.isEmpty()) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
for (FlowFile flowFile : successfulFlowFiles) {
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
}
}
}
}
@OnStopped
public void onStopped() {
shutdownPublisher();
}
private void shutdownPublisher() {
try {
if (publisher != null) {
publisher.shutdown();
}
} catch (Exception e) {
getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher due to {}", new Object[]{e});
}
}
private ProjectTopicName getTopicName(ProcessContext context) {
final String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
final String projectId = context.getProperty(PROJECT_ID).getValue();
if (topic.contains("/")) {
return ProjectTopicName.parse(topic);
} else {
return ProjectTopicName.of(projectId, topic);
}
}
private Map<String, String> getDynamicAttributesMap(ProcessContext context, FlowFile flowFile) {
final Map<String, String> attributes = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
attributes.put(entry.getKey().getName(), value);
}
}
return attributes;
}
private Publisher.Builder getPublisherBuilder(ProcessContext context) {
final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
return Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(batchSize)
.setIsEnabled(true)
.build());
}
}

View File

@ -15,4 +15,6 @@
org.apache.nifi.processors.gcp.storage.PutGCSObject org.apache.nifi.processors.gcp.storage.PutGCSObject
org.apache.nifi.processors.gcp.storage.FetchGCSObject org.apache.nifi.processors.gcp.storage.FetchGCSObject
org.apache.nifi.processors.gcp.storage.DeleteGCSObject org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import java.util.HashMap;
import java.util.Map;
public class AbstractGCPubSubIT {
protected static final String PROJECT_ID = "my-gcm-client";
protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
protected static TestRunner runner;
protected TestRunner setCredentialsCS(TestRunner runner) throws InitializationException {
final String serviceAccountJsonFilePath = "path/to/credentials/json";
final Map<String, String> propertiesMap = new HashMap<>();
final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
propertiesMap.put("application-default-credentials", "false");
propertiesMap.put("compute-engine-credentials", "false");
propertiesMap.put("service-account-json-file", serviceAccountJsonFilePath);
runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
runner.enableControllerService(credentialsControllerService);
runner.assertValid(credentialsControllerService);
return runner;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
@BeforeClass
public static void setup() throws InitializationException {
runner = TestRunners.newTestRunner(ConsumeGCPubSub.class);
}
@Test
public void testSimpleConsume() throws InitializationException {
final String subscription = "my-sub";
runner.clearTransferState();
runner = setCredentialsCS(runner);
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 10);
runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
}
@Test
public void testConsumeWithBatchSize() throws InitializationException {
final String subscription = "my-sub";
runner.clearTransferState();
runner = setCredentialsCS(runner);
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
runner.run();
runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 4);
runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
}
@Test
public void testConsumeWithFormattedSubscriptionName() throws InitializationException {
final String subscription = "projects/my-gcm-client/subscriptions/my-sub";
runner.clearTransferState();
runner = setCredentialsCS(runner);
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(ConsumeGCPubSub.REL_SUCCESS, 2);
runner.assertAllFlowFilesContainAttribute(ACK_ID_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_ATTRIBUTES_COUNT_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(MSG_PUBLISH_TIME_ATTRIBUTE);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
public class PublishGCPubSubIT extends AbstractGCPubSubIT{
@BeforeClass
public static void setup() throws InitializationException {
runner = TestRunners.newTestRunner(PublishGCPubSub.class);
}
@Test
public void testSimplePublish() throws InitializationException {
final String topic = "my-topic";
runner = setCredentialsCS(runner);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
runner.assertValid();
runner.enqueue("Testing simple publish");
runner.run();
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE);
}
@Test
public void testPublishWithFormattedTopicName() throws InitializationException {
final String topic = "projects/my-gcm-client/topics/my-topic";
runner = setCredentialsCS(runner);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1");
runner.assertValid();
runner.enqueue("Testing publish with formatted topic name");
runner.run();
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 2);
runner.assertAllFlowFilesContainAttribute(MESSAGE_ID_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(TOPIC_NAME_ATTRIBUTE);
}
}