NIFI-11553 Add Record handling and more Properties for GCP PubSub

- Added Message Derivation Strategy to PublishGCPubSub with FlowFile and Record options
- Added API Endpoint property to PublishGCPPubSub and ConsumeGCPubSub
- Added Batch configuration properties

This closes #7274

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-05-16 09:38:39 -04:00 committed by exceptionfactory
parent 962dc9bc38
commit 336b857442
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
13 changed files with 863 additions and 110 deletions

View File

@ -19,9 +19,11 @@ package org.apache.nifi.processors.gcp.pubsub;
import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.processor.VerifiableProcessor;
@ -36,9 +38,9 @@ import java.util.Set;
public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor { public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new PropertyDescriptor.Builder()
.name("gcp-pubsub-publish-batch-size") .name("gcp-pubsub-publish-batch-size")
.displayName("Batch Size") .displayName("Batch Size Threshold")
.description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " + .description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " +
"will be used in a batch") "will be used in a batch")
.required(true) .required(true)
@ -46,6 +48,39 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor imp
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new PropertyDescriptor.Builder()
.name("gcp-batch-bytes")
.displayName("Batch Bytes Threshold")
.description("Publish request gets triggered based on this Batch Bytes Threshold property and"
+ " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " property, whichever condition is met first.")
.required(true)
.defaultValue("3 MB")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new PropertyDescriptor.Builder()
.name("gcp-pubsub-publish-batch-delay")
.displayName("Batch Delay Threshold")
.description("Indicates the delay threshold to use for batching. After this amount of time has elapsed " +
"(counting from the first element added), the elements will be wrapped up in a batch and sent. " +
"This value should not be set too high, usually on the order of milliseconds. Otherwise, calls " +
"might appear to never complete.")
.required(true)
.defaultValue("100 millis")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor API_ENDPOINT = new PropertyDescriptor
.Builder().name("api-endpoint")
.displayName("API Endpoint")
.description("Override the gRPC endpoint in the form of [host:port]")
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.defaultValue(PublisherStubSettings.getDefaultEndpoint()) // identical to SubscriberStubSettings.getDefaultEndpoint()
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("FlowFiles are routed to this relationship after a successful Google Cloud Pub/Sub operation.") .description("FlowFiles are routed to this relationship after a successful Google Cloud Pub/Sub operation.")

View File

@ -102,7 +102,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
@OnScheduled @OnScheduled
public void onScheduled(ProcessContext context) { public void onScheduled(ProcessContext context) {
final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); final Integer batchSize = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
pullRequest = PullRequest.newBuilder() pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize) .setMaxMessages(batchSize)
@ -192,7 +192,8 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(SUBSCRIPTION); descriptors.add(SUBSCRIPTION);
descriptors.add(BATCH_SIZE); descriptors.add(BATCH_SIZE_THRESHOLD);
descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors); return Collections.unmodifiableList(descriptors);
} }
@ -268,11 +269,13 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
} }
private SubscriberStub getSubscriber(final ProcessContext context) throws IOException { private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() final String endpoint = context.getProperty(API_ENDPOINT).getValue();
final SubscriberStubSettings.Builder subscriberBuilder = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setTransportChannelProvider(getTransportChannelProvider(context)) .setTransportChannelProvider(getTransportChannelProvider(context))
.build(); .setEndpoint(endpoint);
return GrpcSubscriberStub.create(subscriberStubSettings); return GrpcSubscriberStub.create(subscriberBuilder.build());
} }
} }

View File

@ -21,6 +21,9 @@ public class PubSubAttributes {
public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId"; public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic"; public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
public static final String RECORDS_ATTRIBUTE = "gcp.pubsub.count.records";
public static final String RECORDS_DESCRIPTION = "Count of pubsub messages published to the configured Google Cloud PubSub topic";
public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic"; public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to"; public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";

View File

@ -17,13 +17,15 @@
package org.apache.nifi.processors.gcp.pubsub; package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.core.ApiFuture; import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.TestIamPermissionsRequest; import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse; import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -48,11 +50,27 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.threeten.bp.Duration;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -64,12 +82,13 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
@ -82,12 +101,63 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
@WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = RECORDS_DESCRIPTION),
@WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION) @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
}) })
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content " @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.") + "will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish"); private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Input Batch Size")
.displayName("Input Batch Size")
.description("Maximum number of FlowFiles processed for each Processor invocation")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NUMBER_VALIDATOR)
.defaultValue("100")
.build();
public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new PropertyDescriptor.Builder()
.name("Message Derivation Strategy")
.displayName("Message Derivation Strategy")
.description("The strategy used to publish the incoming FlowFile to the Google Cloud PubSub endpoint.")
.required(true)
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
.allowableValues(MessageDerivationStrategy.class)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue())
.required(true)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to GCPubSub endpoint")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue())
.required(true)
.build();
public static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Message Size")
.displayName("Maximum Message Size")
.description("The maximum size of a Google PubSub message in bytes. Defaults to 1 MB (1048576 bytes)")
.dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
.name("gcp-pubsub-topic") .name("gcp-pubsub-topic")
@ -103,14 +173,21 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
.description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.") .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.")
.build(); .build();
private Publisher publisher = null; protected Publisher publisher = null;
private final AtomicReference<Exception> storedException = new AtomicReference<>();
@Override @Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(MAX_BATCH_SIZE);
descriptors.add(MESSAGE_DERIVATION_STRATEGY);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(MAX_MESSAGE_SIZE);
descriptors.add(TOPIC_NAME); descriptors.add(TOPIC_NAME);
descriptors.add(BATCH_SIZE); descriptors.add(BATCH_SIZE_THRESHOLD);
descriptors.add(BATCH_BYTES_THRESHOLD);
descriptors.add(BATCH_DELAY_THRESHOLD);
descriptors.add(API_ENDPOINT);
return Collections.unmodifiableList(descriptors); return Collections.unmodifiableList(descriptors);
} }
@ -139,8 +216,7 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
try { try {
publisher = getPublisherBuilder(context).build(); publisher = getPublisherBuilder(context).build();
} catch (IOException e) { } catch (IOException e) {
getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e}); throw new ProcessException("Failed to create Google Cloud PubSub Publisher", e);
storedException.set(e);
} }
} }
@ -171,25 +247,26 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
.setTransportChannelProvider(getTransportChannelProvider(context)) .setTransportChannelProvider(getTransportChannelProvider(context))
.build(); .build();
final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings); try (GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings)) {
final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
.addAllPermissions(REQUIRED_PERMISSIONS) .addAllPermissions(REQUIRED_PERMISSIONS)
.setResource(topicName) .setResource(topicName)
.build(); .build();
final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request); final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request);
if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) { if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
results.add(new ConfigVerificationResult.Builder() results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Test IAM Permissions") .verificationStepName("Test IAM Permissions")
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName)) .explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName))
.build()); .build());
} else { } else {
results.add(new ConfigVerificationResult.Builder() results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Test IAM Permissions") .verificationStepName("Test IAM Permissions")
.outcome(ConfigVerificationResult.Outcome.FAILED) .outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName)) .explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName))
.build()); .build());
}
} }
} catch (final ApiException e) { } catch (final ApiException e) {
verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e); verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
@ -213,64 +290,145 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); final StopWatch stopWatch = new StopWatch(true);
final List<FlowFile> flowFiles = session.get(flowFileCount); final MessageDerivationStrategy inputStrategy = MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
if (flowFiles.isEmpty() || publisher == null) { final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
if (storedException.get() != null) { if (flowFileBatch.isEmpty()) {
getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{storedException.get()});
}
context.yield(); context.yield();
return; } else if (MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
onTriggerFlowFileStrategy(context, session, stopWatch, flowFileBatch);
} else if (MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
onTriggerRecordStrategy(context, session, stopWatch, flowFileBatch);
} else {
throw new IllegalStateException(inputStrategy.getValue());
} }
}
final long startNanos = System.nanoTime(); private void onTriggerFlowFileStrategy(
final List<FlowFile> successfulFlowFiles = new ArrayList<>(); final ProcessContext context,
final String topicName = getTopicName(context).toString(); final ProcessSession session,
final StopWatch stopWatch,
final List<FlowFile> flowFileBatch) throws ProcessException {
final long maxMessageSize = context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
final Executor executor = MoreExecutors.directExecutor();
final List<FlowFileResult> flowFileResults = new ArrayList<>();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (final FlowFile flowFile : flowFileBatch) {
final List<ApiFuture<String>> futures = new ArrayList<>();
final List<String> successes = new ArrayList<>();
final List<Throwable> failures = new ArrayList<>();
if (flowFile.getSize() > maxMessageSize) {
final String message = String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", flowFile.getSize());
failures.add(new IllegalArgumentException(message));
flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
} else {
baos.reset();
session.exportTo(flowFile, baos);
final ApiFuture<String> apiFuture = publishOneMessage(context, flowFile, baos.toByteArray());
futures.add(apiFuture);
addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
}
}
finishBatch(session, stopWatch, flowFileResults);
}
private void onTriggerRecordStrategy(
final ProcessContext context,
final ProcessSession session,
final StopWatch stopWatch,
final List<FlowFile> flowFileBatch) throws ProcessException {
try { try {
for (FlowFile flowFile : flowFiles) { onTriggerRecordStrategyPublishRecords(context, session, stopWatch, flowFileBatch);
try { } catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); throw new ProcessException("Record publishing failed", e);
session.exportTo(flowFile, baos);
final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent)
.setPublishTime(Timestamp.newBuilder().build())
.putAllAttributes(getDynamicAttributesMap(context, flowFile))
.build();
ApiFuture<String> messageIdFuture = publisher.publish(message);
final Map<String, String> attributes = new HashMap<>();
attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get());
attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
flowFile = session.putAllAttributes(flowFile, attributes);
successfulFlowFiles.add(flowFile);
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof DeadlineExceededException) {
getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " +
"so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_RETRY);
} else {
getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}'", topicName, e);
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
}
}
} finally {
if (!successfulFlowFiles.isEmpty()) {
session.transfer(successfulFlowFiles, REL_SUCCESS);
for (FlowFile flowFile : successfulFlowFiles) {
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
}
}
} }
} }
private void onTriggerRecordStrategyPublishRecords(
final ProcessContext context,
final ProcessSession session,
final StopWatch stopWatch,
final List<FlowFile> flowFileBatch)
throws ProcessException, IOException, SchemaNotFoundException, MalformedRecordException {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Executor executor = MoreExecutors.directExecutor();
final List<FlowFileResult> flowFileResults = new ArrayList<>();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (final FlowFile flowFile : flowFileBatch) {
final List<ApiFuture<String>> futures = new ArrayList<>();
final List<String> successes = new ArrayList<>();
final List<Throwable> failures = new ArrayList<>();
final Map<String, String> attributes = flowFile.getAttributes();
final RecordReader reader = readerFactory.createRecordReader(
attributes, session.read(flowFile), flowFile.getSize(), getLogger());
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes);
final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
while (pushBackRecordSet.isAnotherRecord()) {
final ApiFuture<String> apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
futures.add(apiFuture);
addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
}
flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
}
finishBatch(session, stopWatch, flowFileResults);
}
private ApiFuture<String> publishOneRecord(
final ProcessContext context,
final FlowFile flowFile,
final ByteArrayOutputStream baos,
final RecordSetWriter writer,
final Record record) throws IOException {
baos.reset();
writer.write(record);
writer.flush();
return publishOneMessage(context, flowFile, baos.toByteArray());
}
private ApiFuture<String> publishOneMessage(final ProcessContext context,
final FlowFile flowFile,
final byte[] content) {
final PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(content))
.setPublishTime(Timestamp.newBuilder().build())
.putAllAttributes(getDynamicAttributesMap(context, flowFile))
.build();
return publisher.publish(message);
}
private void finishBatch(final ProcessSession session,
final StopWatch stopWatch,
final List<FlowFileResult> flowFileResults) {
final String topicName = publisher.getTopicNameString();
for (final FlowFileResult flowFileResult : flowFileResults) {
final Relationship relationship = flowFileResult.reconcile();
final Map<String, String> attributes = flowFileResult.getAttributes();
attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
final FlowFile flowFile = session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, topicName);
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, relationship);
}
}
protected void addCallback(final ApiFuture<String> apiFuture, final ApiFutureCallback<? super String> callback, Executor executor) {
ApiFutures.addCallback(apiFuture, callback, executor);
}
@OnStopped @OnStopped
public void onStopped() { public void onStopped() {
shutdownPublisher(); shutdownPublisher();
@ -310,14 +468,22 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
} }
private Publisher.Builder getPublisherBuilder(ProcessContext context) { private Publisher.Builder getPublisherBuilder(ProcessContext context) {
final Long batchSize = context.getProperty(BATCH_SIZE).asLong(); final Long batchSizeThreshold = context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
final long batchBytesThreshold = context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
final Long batchDelayThreshold = context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
final String endpoint = context.getProperty(API_ENDPOINT).getValue();
return Publisher.newBuilder(getTopicName(context)) final Publisher.Builder publisherBuilder = Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setChannelProvider(getTransportChannelProvider(context)) .setChannelProvider(getTransportChannelProvider(context))
.setBatchingSettings(BatchingSettings.newBuilder() .setEndpoint(endpoint);
.setElementCountThreshold(batchSize)
publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(batchSizeThreshold)
.setRequestByteThreshold(batchBytesThreshold)
.setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
.setIsEnabled(true) .setIsEnabled(true)
.build()); .build());
return publisherBuilder;
} }
} }

View File

@ -112,17 +112,6 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor
.Builder().name("gcp-batch-bytes")
.displayName("Batch Bytes Threshold")
.description("Publish request gets triggered based on this Batch Bytes Threshold property and"
+ " the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.")
.required(true)
.defaultValue("3 MB")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
private Publisher publisher = null; private Publisher publisher = null;
@Override @Override
@ -130,8 +119,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME, return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME,
GCP_CREDENTIALS_PROVIDER_SERVICE, GCP_CREDENTIALS_PROVIDER_SERVICE,
ORDERING_KEY, ORDERING_KEY,
BATCH_SIZE, BATCH_SIZE_THRESHOLD,
BATCH_BYTES)); BATCH_BYTES_THRESHOLD,
BATCH_DELAY_THRESHOLD));
} }
@Override @Override
@ -193,7 +183,7 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); final int flowFileCount = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
final List<FlowFile> flowFiles = session.get(flowFileCount); final List<FlowFile> flowFiles = session.get(flowFileCount);
if (flowFiles.isEmpty()) { if (flowFiles.isEmpty()) {
@ -290,9 +280,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
.setTopicPath(topicPath) .setTopicPath(topicPath)
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder() .setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong()) .setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong())
.setDelayThreshold(Duration.ofMillis(100)) .setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue())
.setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue()) .setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS)))
.setIsEnabled(true) .setIsEnabled(true)
.build()) .build())
.build(); .build();

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.publish;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.UnavailableException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to Google PubSub endpoint.
*/
public class FlowFileResult {
private static final Logger logger = LoggerFactory.getLogger(FlowFileResult.class);
private final FlowFile flowFile;
private final Map<String, String> attributes;
private final List<ApiFuture<String>> futures;
private final List<String> successes;
private final List<Throwable> failures;
public FlowFileResult(final FlowFile flowFile, final List<ApiFuture<String>> futures,
final List<String> successes, final List<Throwable> failures) {
this.flowFile = flowFile;
this.attributes = new LinkedHashMap<>();
this.futures = futures;
this.successes = successes;
this.failures = failures;
}
/**
* After all in-flight messages have results, calculate appropriate {@link Relationship}.
*/
public Relationship reconcile() {
while (futures.size() > (successes.size() + failures.size())) {
try {
ApiFutures.allAsList(futures).get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Failed to reconcile PubSub send operation status", e);
}
}
if (futures.size() == successes.size()) {
if (futures.size() == 1) {
attributes.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, successes.iterator().next());
} else {
attributes.put(PubSubAttributes.RECORDS_ATTRIBUTE, Integer.toString(futures.size()));
}
}
return RelationshipMapper.toRelationship(failures);
}
public FlowFile getFlowFile() {
return flowFile;
}
public Map<String, String> getAttributes() {
return attributes;
}
/**
* Logic to derive an appropriate {@link Relationship} from the feedback provided by the client library.
* <p>
* Each {@link com.google.pubsub.v1.PubsubMessage} is associated with a {@link TrackedApiFutureCallback} at time of
* submission to the client library. This callback allows the client library to convey information to the caller
* about the result of the (asynchronous) send. If a send fails, an appropriate exception is conveyed, providing
* detail about the send failure; otherwise a message id (provided by the service) is supplied.
* <p>
* Types of exceptions might be classified into "retryable" (another send may be attempted) or non-retryable.
*/
private static class RelationshipMapper {
private static Relationship toRelationship(final List<Throwable> failures) {
Relationship relationship = PublishGCPubSub.REL_SUCCESS;
boolean isRetry = false;
boolean isFailure = false;
for (final Throwable failure : failures) {
if (isRetryException(failure)) {
isRetry = true;
} else {
isFailure = true;
break;
}
}
if (isFailure) {
relationship = PublishGCPubSub.REL_FAILURE;
} else if (isRetry) {
relationship = PublishGCPubSub.REL_RETRY;
}
return relationship;
}
/**
* Retryable exceptions indicate transient conditions; another send attempt might succeed.
*/
private static final Collection<Class<? extends Throwable>> RETRY_EXCEPTIONS = Collections.singleton(
UnavailableException.class);
/**
* Exceptions provided by client library might include a nested exception that indicates a transient condition,
* so the entire exception chain should be checked.
*/
private static boolean isRetryException(final Throwable t) {
if (t == null) {
return false;
} else if (RETRY_EXCEPTIONS.contains(t.getClass())) {
return true;
} else {
final Throwable cause = t.getCause();
if (t.equals(cause)) {
return false;
} else {
return isRetryException(cause);
}
}
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.publish;
import org.apache.nifi.components.DescribedValue;
/**
* Strategy for publishing data to GCP via <code>PublishGCPubSub</code> processor.
*/
public enum MessageDerivationStrategy implements DescribedValue {
FLOWFILE_ORIENTED("FlowFile Oriented",
"Each incoming FlowFile is sent as a Google Cloud PubSub message"),
RECORD_ORIENTED("Record Oriented",
"Each incoming FlowFile is parsed into NiFi records, which are each sent as a Google Cloud PubSub message");
private final String displayName;
private final String description;
MessageDerivationStrategy(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.publish;
import com.google.api.core.ApiFutureCallback;
import java.util.List;
/**
* Specialization of {@link ApiFutureCallback} used to track Google PubSub send results. Failure
* exceptions are captured to facilitate FlowFile routing decisions.
*/
public class TrackedApiFutureCallback implements ApiFutureCallback<String> {
private final List<String> successes;
private final List<Throwable> failures;
public TrackedApiFutureCallback(final List<String> successes, final List<Throwable> failures) {
this.successes = successes;
this.failures = failures;
}
@Override
public void onFailure(final Throwable t) {
failures.add(t);
}
@Override
public void onSuccess(final String result) {
successes.add(result);
}
}

View File

@ -42,7 +42,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10"); runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "10");
runner.assertValid(); runner.assertValid();
@ -64,7 +64,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
runner.assertValid(); runner.assertValid();
@ -88,7 +88,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription);
runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2");
runner.assertValid(); runner.assertValid();

View File

@ -40,7 +40,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
runner.assertValid(); runner.assertValid();
@ -61,7 +61,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID);
runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1");
runner.assertValid(); runner.assertValid();

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.UnavailableException;
import com.google.cloud.pubsub.v1.Publisher;
import io.grpc.Status;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PublishGCPubSubTest {
private static final String TOPIC = "my-topic";
private static final String PROJECT = "my-project";
private Throwable throwable;
private Publisher publisherMock;
private TestRunner runner;
@BeforeEach
void setRunner() {
throwable = null;
publisherMock = mock(Publisher.class);
runner = TestRunners.newTestRunner(new PublishGCPubSub() {
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
publisher = publisherMock;
}
@Override
protected void addCallback(ApiFuture<String> apiFuture, ApiFutureCallback<? super String> callback, Executor executor) {
if (callback instanceof TrackedApiFutureCallback) {
final TrackedApiFutureCallback apiFutureCallback = (TrackedApiFutureCallback) callback;
if (throwable == null) {
apiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis()));
} else {
apiFutureCallback.onFailure(throwable);
}
}
}
});
}
@Test
void testPropertyDescriptors() throws InitializationException {
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.assertValid();
runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost");
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443");
runner.assertValid();
runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1");
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15");
runner.assertValid();
runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3");
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB");
runner.assertValid();
runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100");
runner.assertNotValid();
runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 millis");
runner.assertValid();
}
@Test
void testSendOneSuccessFlowFileStrategy() throws InitializationException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.enqueue("text");
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE));
}
@Test
void testSendOneRetryFlowFileStrategy() throws InitializationException {
throwable = new UnavailableException(null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.enqueue("text");
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
}
@Test
void testSendOneFailureFlowFileStrategy() throws InitializationException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.MAX_MESSAGE_SIZE, "16 B");
runner.enqueue("some really long text");
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
}
@Test
void testSendOneSuccessRecordStrategy() throws InitializationException, IOException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource("pubsub/records.json"))));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
assertEquals("3", flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
}
@Test
void testSendOneRetryRecordStrategy() throws InitializationException, IOException {
throwable = new UnavailableException(null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true);
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource("pubsub/records.json"))));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1);
}
@Test
void testSendOneFailureRecordStrategy() throws InitializationException, IOException {
throwable = new IllegalStateException("testSendOne_Failure_RecordStrategy");
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner));
runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue());
runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource("pubsub/records.json"))));
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
}
private static String getCredentialsServiceId(final TestRunner runner) throws InitializationException {
final ControllerService controllerService = mock(GCPCredentialsControllerService.class);
final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName();
when(controllerService.getIdentifier()).thenReturn(controllerServiceId);
runner.addControllerService(controllerServiceId, controllerService);
runner.enableControllerService(controllerService);
return controllerServiceId;
}
private static String getReaderServiceId(TestRunner runner) throws InitializationException {
final ControllerService readerService = new JsonTreeReader();
final String readerServiceId = readerService.getClass().getName();
runner.addControllerService(readerServiceId, readerService);
runner.enableControllerService(readerService);
return readerServiceId;
}
private static String getWriterServiceId(TestRunner runner) throws InitializationException {
final ControllerService writerService = new JsonRecordSetWriter();
final String writerServiceId = writerService.getClass().getName();
runner.addControllerService(writerServiceId, writerService);
runner.enableControllerService(writerService);
return writerServiceId;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.pubsub.lite;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class PublishGCPubSubLiteTest {
private TestRunner runner;
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(PublishGCPubSubLite.class);
}
@Test
void testPropertyDescriptors() throws InitializationException {
runner.assertNotValid();
final ControllerService controllerService = new GCPCredentialsControllerService();
final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName();
runner.addControllerService(controllerServiceId, controllerService);
runner.enableControllerService(controllerService);
runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE, controllerServiceId);
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.TOPIC_NAME, "projects/my-project/locations/my-location/topics/my-topic");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB");
runner.assertValid();
runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100");
runner.assertNotValid();
runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100 millis");
runner.assertValid();
}
}

View File

@ -0,0 +1,17 @@
[
{
"name": "Acme1",
"address": "1234 First Street",
"zip": "12345"
},
{
"name": "Acme2",
"address": "1234 Second Street",
"zip": "12345"
},
{
"name": "Acme3",
"address": "1234 Third Street",
"zip": "12345"
}
]