mirror of https://github.com/apache/nifi.git
NIFI-11150 Add Service Account JSON credentials support to Google Pub/Sub Lite procesors
This closes #6933. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
2e55498771
commit
9906f0a952
|
@ -18,6 +18,7 @@ package org.apache.nifi.processors.gcp.pubsub.lite;
|
||||||
|
|
||||||
import com.google.api.gax.core.FixedCredentialsProvider;
|
import com.google.api.gax.core.FixedCredentialsProvider;
|
||||||
import com.google.api.gax.rpc.ApiException;
|
import com.google.api.gax.rpc.ApiException;
|
||||||
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.cloud.pubsub.v1.AckReplyConsumer;
|
import com.google.cloud.pubsub.v1.AckReplyConsumer;
|
||||||
import com.google.cloud.pubsub.v1.MessageReceiver;
|
import com.google.cloud.pubsub.v1.MessageReceiver;
|
||||||
import com.google.cloud.pubsublite.SubscriptionPath;
|
import com.google.cloud.pubsublite.SubscriptionPath;
|
||||||
|
@ -50,7 +51,6 @@ import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
|
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -72,13 +72,12 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
|
||||||
|
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
|
||||||
|
|
||||||
@SeeAlso({PublishGCPubSubLite.class})
|
@SeeAlso({PublishGCPubSubLite.class})
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||||
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
|
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
|
||||||
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor "
|
@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription.")
|
||||||
+ "will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with "
|
|
||||||
+ "'Use Application Default Credentials' or 'Use Compute Engine Credentials'.")
|
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
|
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
|
||||||
@WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION),
|
@WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION),
|
||||||
|
@ -219,7 +218,7 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
||||||
message.getConsumer().ack();
|
message.getConsumer().ack();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Subscriber getSubscriber(final ProcessContext context) throws IOException {
|
private Subscriber getSubscriber(final ProcessContext context) {
|
||||||
|
|
||||||
final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
|
final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
|
||||||
|
|
||||||
|
@ -286,4 +285,9 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
||||||
}
|
}
|
||||||
return verificationResults;
|
return verificationResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GoogleCredentials getGoogleCredentials(final ProcessContext context) {
|
||||||
|
return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import com.google.api.core.ApiFutures;
|
||||||
import com.google.api.gax.batching.BatchingSettings;
|
import com.google.api.gax.batching.BatchingSettings;
|
||||||
import com.google.api.gax.core.FixedCredentialsProvider;
|
import com.google.api.gax.core.FixedCredentialsProvider;
|
||||||
import com.google.api.gax.rpc.ApiException;
|
import com.google.api.gax.rpc.ApiException;
|
||||||
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.cloud.pubsublite.TopicPath;
|
import com.google.cloud.pubsublite.TopicPath;
|
||||||
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
|
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
|
||||||
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
|
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
|
||||||
|
@ -75,14 +76,13 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
|
||||||
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
|
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
|
||||||
|
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PLATFORM_SCOPE;
|
||||||
|
|
||||||
@SeeAlso({ConsumeGCPubSubLite.class})
|
@SeeAlso({ConsumeGCPubSubLite.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
|
@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
|
||||||
@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." +
|
@CapabilityDescription("Publishes the content of the incoming FlowFile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." +
|
||||||
" If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will " +
|
" If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
|
||||||
"only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default " +
|
|
||||||
"Credentials' or 'Use Compute Engine Credentials'.")
|
|
||||||
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
|
@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
|
||||||
description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
|
@ -321,4 +321,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
||||||
}
|
}
|
||||||
return verificationResults;
|
return verificationResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GoogleCredentials getGoogleCredentials(final ProcessContext context) {
|
||||||
|
return super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PLATFORM_SCOPE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
||||||
|
|
||||||
public class GoogleUtils {
|
public class GoogleUtils {
|
||||||
|
|
||||||
|
public static final String GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
|
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue