diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java index fc135dba81..6602a0e0c8 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java @@ -19,9 +19,11 @@ package org.apache.nifi.processors.gcp.pubsub; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.VerifiableProcessor; @@ -36,9 +38,9 @@ import java.util.Set; public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor { - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new PropertyDescriptor.Builder() .name("gcp-pubsub-publish-batch-size") - .displayName("Batch Size") + .displayName("Batch Size Threshold") .description("Indicates the number of messages the cloud service should bundle together in a batch. If not set and left empty, only one message " + "will be used in a batch") .required(true) @@ -46,6 +48,39 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor imp .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor BATCH_BYTES_THRESHOLD = new PropertyDescriptor.Builder() + .name("gcp-batch-bytes") + .displayName("Batch Bytes Threshold") + .description("Publish request gets triggered based on this Batch Bytes Threshold property and" + + " the " + BATCH_SIZE_THRESHOLD.getDisplayName() + " property, whichever condition is met first.") + .required(true) + .defaultValue("3 MB") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_DELAY_THRESHOLD = new PropertyDescriptor.Builder() + .name("gcp-pubsub-publish-batch-delay") + .displayName("Batch Delay Threshold") + .description("Indicates the delay threshold to use for batching. After this amount of time has elapsed " + + "(counting from the first element added), the elements will be wrapped up in a batch and sent. " + + "This value should not be set too high, usually on the order of milliseconds. Otherwise, calls " + + "might appear to never complete.") + .required(true) + .defaultValue("100 millis") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor API_ENDPOINT = new PropertyDescriptor + .Builder().name("api-endpoint") + .displayName("API Endpoint") + .description("Override the gRPC endpoint in the form of [host:port]") + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .defaultValue(PublisherStubSettings.getDefaultEndpoint()) // identical to SubscriberStubSettings.getDefaultEndpoint() + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("FlowFiles are routed to this relationship after a successful Google Cloud Pub/Sub operation.") diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java index 3928a2ae63..77d9fcac87 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java @@ -102,7 +102,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor { @OnScheduled public void onScheduled(ProcessContext context) { - final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final Integer batchSize = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger(); pullRequest = PullRequest.newBuilder() .setMaxMessages(batchSize) @@ -192,7 +192,8 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor { public List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); descriptors.add(SUBSCRIPTION); - descriptors.add(BATCH_SIZE); + descriptors.add(BATCH_SIZE_THRESHOLD); + descriptors.add(API_ENDPOINT); return Collections.unmodifiableList(descriptors); } @@ -268,11 +269,13 @@ public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor { } private SubscriberStub getSubscriber(final ProcessContext context) throws IOException { - final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder() + final String endpoint = context.getProperty(API_ENDPOINT).getValue(); + + final SubscriberStubSettings.Builder subscriberBuilder = SubscriberStubSettings.newBuilder() .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setTransportChannelProvider(getTransportChannelProvider(context)) - .build(); + .setEndpoint(endpoint); - return GrpcSubscriberStub.create(subscriberStubSettings); + return GrpcSubscriberStub.create(subscriberBuilder.build()); } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java index cfcdda5649..b9654112d4 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java @@ -21,6 +21,9 @@ public class PubSubAttributes { public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId"; public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic"; + public static final String RECORDS_ATTRIBUTE = "gcp.pubsub.count.records"; + public static final String RECORDS_DESCRIPTION = "Count of pubsub messages published to the configured Google Cloud PubSub topic"; + public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic"; public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to"; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java index 86545ab5fd..f2d5ed7ebd 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java @@ -17,13 +17,15 @@ package org.apache.nifi.processors.gcp.pubsub; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.DeadlineExceededException; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; +import com.google.common.util.concurrent.MoreExecutors; import com.google.iam.v1.TestIamPermissionsRequest; import com.google.iam.v1.TestIamPermissionsResponse; import com.google.protobuf.ByteString; @@ -48,11 +50,27 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult; +import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy; +import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.StopWatch; +import org.threeten.bp.Duration; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -64,12 +82,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE; import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION; @@ -82,12 +101,63 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @WritesAttributes({ @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION), + @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description = RECORDS_DESCRIPTION), @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION) }) @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content " + "will be read into memory to be sent as a PubSub message.") public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { private static final List REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish"); + private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s"; + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Input Batch Size") + .displayName("Input Batch Size") + .description("Maximum number of FlowFiles processed for each Processor invocation") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .defaultValue("100") + .build(); + + public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new PropertyDescriptor.Builder() + .name("Message Derivation Strategy") + .displayName("Message Derivation Strategy") + .description("The strategy used to publish the incoming FlowFile to the Google Cloud PubSub endpoint.") + .required(true) + .defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue()) + .allowableValues(MessageDerivationStrategy.class) + .build(); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .displayName("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to GCPubSub endpoint") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Message Size") + .displayName("Maximum Message Size") + .description("The maximum size of a Google PubSub message in bytes. Defaults to 1 MB (1048576 bytes)") + .dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue()) + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() .name("gcp-pubsub-topic") @@ -103,14 +173,21 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.") .build(); - private Publisher publisher = null; - private final AtomicReference storedException = new AtomicReference<>(); + protected Publisher publisher = null; @Override public List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); + descriptors.add(MAX_BATCH_SIZE); + descriptors.add(MESSAGE_DERIVATION_STRATEGY); + descriptors.add(RECORD_READER); + descriptors.add(RECORD_WRITER); + descriptors.add(MAX_MESSAGE_SIZE); descriptors.add(TOPIC_NAME); - descriptors.add(BATCH_SIZE); + descriptors.add(BATCH_SIZE_THRESHOLD); + descriptors.add(BATCH_BYTES_THRESHOLD); + descriptors.add(BATCH_DELAY_THRESHOLD); + descriptors.add(API_ENDPOINT); return Collections.unmodifiableList(descriptors); } @@ -139,8 +216,7 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { try { publisher = getPublisherBuilder(context).build(); } catch (IOException e) { - getLogger().error("Failed to create Google Cloud PubSub Publisher due to {}", new Object[]{e}); - storedException.set(e); + throw new ProcessException("Failed to create Google Cloud PubSub Publisher", e); } } @@ -171,25 +247,26 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { .setTransportChannelProvider(getTransportChannelProvider(context)) .build(); - final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings); - final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); - final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() - .addAllPermissions(REQUIRED_PERMISSIONS) - .setResource(topicName) - .build(); - final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request); - if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) { - results.add(new ConfigVerificationResult.Builder() - .verificationStepName("Test IAM Permissions") - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName)) - .build()); - } else { - results.add(new ConfigVerificationResult.Builder() - .verificationStepName("Test IAM Permissions") - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName)) - .build()); + try (GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings)) { + final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue(); + final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .addAllPermissions(REQUIRED_PERMISSIONS) + .setResource(topicName) + .build(); + final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request); + if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Test IAM Permissions") + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName)) + .build()); + } else { + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("Test IAM Permissions") + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName)) + .build()); + } } } catch (final ApiException e) { verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e); @@ -213,64 +290,145 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); - final List flowFiles = session.get(flowFileCount); - - if (flowFiles.isEmpty() || publisher == null) { - if (storedException.get() != null) { - getLogger().error("Google Cloud PubSub Publisher was not properly created due to {}", new Object[]{storedException.get()}); - } + final StopWatch stopWatch = new StopWatch(true); + final MessageDerivationStrategy inputStrategy = MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue()); + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final List flowFileBatch = session.get(maxBatchSize); + if (flowFileBatch.isEmpty()) { context.yield(); - return; + } else if (MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) { + onTriggerFlowFileStrategy(context, session, stopWatch, flowFileBatch); + } else if (MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) { + onTriggerRecordStrategy(context, session, stopWatch, flowFileBatch); + } else { + throw new IllegalStateException(inputStrategy.getValue()); } + } - final long startNanos = System.nanoTime(); - final List successfulFlowFiles = new ArrayList<>(); - final String topicName = getTopicName(context).toString(); + private void onTriggerFlowFileStrategy( + final ProcessContext context, + final ProcessSession session, + final StopWatch stopWatch, + final List flowFileBatch) throws ProcessException { + final long maxMessageSize = context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue(); + final Executor executor = MoreExecutors.directExecutor(); + final List flowFileResults = new ArrayList<>(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + for (final FlowFile flowFile : flowFileBatch) { + final List> futures = new ArrayList<>(); + final List successes = new ArrayList<>(); + final List failures = new ArrayList<>(); + + if (flowFile.getSize() > maxMessageSize) { + final String message = String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", flowFile.getSize()); + failures.add(new IllegalArgumentException(message)); + flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures)); + } else { + baos.reset(); + session.exportTo(flowFile, baos); + + final ApiFuture apiFuture = publishOneMessage(context, flowFile, baos.toByteArray()); + futures.add(apiFuture); + addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor); + flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures)); + } + } + finishBatch(session, stopWatch, flowFileResults); + } + + private void onTriggerRecordStrategy( + final ProcessContext context, + final ProcessSession session, + final StopWatch stopWatch, + final List flowFileBatch) throws ProcessException { try { - for (FlowFile flowFile : flowFiles) { - try { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - session.exportTo(flowFile, baos); - final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray()); - - PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent) - .setPublishTime(Timestamp.newBuilder().build()) - .putAllAttributes(getDynamicAttributesMap(context, flowFile)) - .build(); - - ApiFuture messageIdFuture = publisher.publish(message); - - final Map attributes = new HashMap<>(); - attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get()); - attributes.put(TOPIC_NAME_ATTRIBUTE, topicName); - - flowFile = session.putAllAttributes(flowFile, attributes); - successfulFlowFiles.add(flowFile); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause() instanceof DeadlineExceededException) { - getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {} but attempting again may succeed " + - "so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e); - session.transfer(flowFile, REL_RETRY); - } else { - getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}'", topicName, e); - session.transfer(flowFile, REL_FAILURE); - } - context.yield(); - } - } - } finally { - if (!successfulFlowFiles.isEmpty()) { - session.transfer(successfulFlowFiles, REL_SUCCESS); - for (FlowFile flowFile : successfulFlowFiles) { - final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis); - } - } + onTriggerRecordStrategyPublishRecords(context, session, stopWatch, flowFileBatch); + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Record publishing failed", e); } } + private void onTriggerRecordStrategyPublishRecords( + final ProcessContext context, + final ProcessSession session, + final StopWatch stopWatch, + final List flowFileBatch) + throws ProcessException, IOException, SchemaNotFoundException, MalformedRecordException { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + final Executor executor = MoreExecutors.directExecutor(); + final List flowFileResults = new ArrayList<>(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + for (final FlowFile flowFile : flowFileBatch) { + final List> futures = new ArrayList<>(); + final List successes = new ArrayList<>(); + final List failures = new ArrayList<>(); + + final Map attributes = flowFile.getAttributes(); + final RecordReader reader = readerFactory.createRecordReader( + attributes, session.read(flowFile), flowFile.getSize(), getLogger()); + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); + + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, baos, attributes); + final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); + + while (pushBackRecordSet.isAnotherRecord()) { + final ApiFuture apiFuture = publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next()); + futures.add(apiFuture); + addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor); + } + flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures)); + } + finishBatch(session, stopWatch, flowFileResults); + } + + private ApiFuture publishOneRecord( + final ProcessContext context, + final FlowFile flowFile, + final ByteArrayOutputStream baos, + final RecordSetWriter writer, + final Record record) throws IOException { + baos.reset(); + writer.write(record); + writer.flush(); + return publishOneMessage(context, flowFile, baos.toByteArray()); + } + + private ApiFuture publishOneMessage(final ProcessContext context, + final FlowFile flowFile, + final byte[] content) { + final PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(content)) + .setPublishTime(Timestamp.newBuilder().build()) + .putAllAttributes(getDynamicAttributesMap(context, flowFile)) + .build(); + return publisher.publish(message); + } + + private void finishBatch(final ProcessSession session, + final StopWatch stopWatch, + final List flowFileResults) { + final String topicName = publisher.getTopicNameString(); + for (final FlowFileResult flowFileResult : flowFileResults) { + final Relationship relationship = flowFileResult.reconcile(); + final Map attributes = flowFileResult.getAttributes(); + attributes.put(TOPIC_NAME_ATTRIBUTE, topicName); + final FlowFile flowFile = session.putAllAttributes(flowFileResult.getFlowFile(), attributes); + final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, topicName); + session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, relationship); + } + } + + protected void addCallback(final ApiFuture apiFuture, final ApiFutureCallback callback, Executor executor) { + ApiFutures.addCallback(apiFuture, callback, executor); + } + @OnStopped public void onStopped() { shutdownPublisher(); @@ -310,14 +468,22 @@ public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor { } private Publisher.Builder getPublisherBuilder(ProcessContext context) { - final Long batchSize = context.getProperty(BATCH_SIZE).asLong(); + final Long batchSizeThreshold = context.getProperty(BATCH_SIZE_THRESHOLD).asLong(); + final long batchBytesThreshold = context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue(); + final Long batchDelayThreshold = context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS); + final String endpoint = context.getProperty(API_ENDPOINT).getValue(); - return Publisher.newBuilder(getTopicName(context)) + final Publisher.Builder publisherBuilder = Publisher.newBuilder(getTopicName(context)) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setChannelProvider(getTransportChannelProvider(context)) - .setBatchingSettings(BatchingSettings.newBuilder() - .setElementCountThreshold(batchSize) + .setEndpoint(endpoint); + + publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder() + .setElementCountThreshold(batchSizeThreshold) + .setRequestByteThreshold(batchBytesThreshold) + .setDelayThreshold(Duration.ofMillis(batchDelayThreshold)) .setIsEnabled(true) .build()); + return publisherBuilder; } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java index 603cfb954a..5b74f2015d 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java @@ -112,17 +112,6 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); - public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor - .Builder().name("gcp-batch-bytes") - .displayName("Batch Bytes Threshold") - .description("Publish request gets triggered based on this Batch Bytes Threshold property and" - + " the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.") - .required(true) - .defaultValue("3 MB") - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .build(); - private Publisher publisher = null; @Override @@ -130,8 +119,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME, GCP_CREDENTIALS_PROVIDER_SERVICE, ORDERING_KEY, - BATCH_SIZE, - BATCH_BYTES)); + BATCH_SIZE_THRESHOLD, + BATCH_BYTES_THRESHOLD, + BATCH_DELAY_THRESHOLD)); } @Override @@ -193,7 +183,7 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger(); + final int flowFileCount = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger(); final List flowFiles = session.get(flowFileCount); if (flowFiles.isEmpty()) { @@ -290,9 +280,9 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve .setTopicPath(topicPath) .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context))) .setBatchingSettings(BatchingSettings.newBuilder() - .setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong()) - .setDelayThreshold(Duration.ofMillis(100)) - .setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue()) + .setElementCountThreshold(context.getProperty(BATCH_SIZE_THRESHOLD).asLong()) + .setRequestByteThreshold(context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue()) + .setDelayThreshold(Duration.ofMillis(context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS))) .setIsEnabled(true) .build()) .build(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java new file mode 100644 index 0000000000..99300d1abe --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.publish; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.UnavailableException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes; +import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to Google PubSub endpoint. + */ +public class FlowFileResult { + private static final Logger logger = LoggerFactory.getLogger(FlowFileResult.class); + + private final FlowFile flowFile; + private final Map attributes; + private final List> futures; + private final List successes; + private final List failures; + + public FlowFileResult(final FlowFile flowFile, final List> futures, + final List successes, final List failures) { + this.flowFile = flowFile; + this.attributes = new LinkedHashMap<>(); + this.futures = futures; + this.successes = successes; + this.failures = failures; + } + + /** + * After all in-flight messages have results, calculate appropriate {@link Relationship}. + */ + public Relationship reconcile() { + while (futures.size() > (successes.size() + failures.size())) { + try { + ApiFutures.allAsList(futures).get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("Failed to reconcile PubSub send operation status", e); + } + } + if (futures.size() == successes.size()) { + if (futures.size() == 1) { + attributes.put(PubSubAttributes.MESSAGE_ID_ATTRIBUTE, successes.iterator().next()); + } else { + attributes.put(PubSubAttributes.RECORDS_ATTRIBUTE, Integer.toString(futures.size())); + } + } + return RelationshipMapper.toRelationship(failures); + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public Map getAttributes() { + return attributes; + } + + /** + * Logic to derive an appropriate {@link Relationship} from the feedback provided by the client library. + *

+ * Each {@link com.google.pubsub.v1.PubsubMessage} is associated with a {@link TrackedApiFutureCallback} at time of + * submission to the client library. This callback allows the client library to convey information to the caller + * about the result of the (asynchronous) send. If a send fails, an appropriate exception is conveyed, providing + * detail about the send failure; otherwise a message id (provided by the service) is supplied. + *

+ * Types of exceptions might be classified into "retryable" (another send may be attempted) or non-retryable. + */ + private static class RelationshipMapper { + + private static Relationship toRelationship(final List failures) { + Relationship relationship = PublishGCPubSub.REL_SUCCESS; + boolean isRetry = false; + boolean isFailure = false; + for (final Throwable failure : failures) { + if (isRetryException(failure)) { + isRetry = true; + } else { + isFailure = true; + break; + } + } + if (isFailure) { + relationship = PublishGCPubSub.REL_FAILURE; + } else if (isRetry) { + relationship = PublishGCPubSub.REL_RETRY; + } + return relationship; + } + + /** + * Retryable exceptions indicate transient conditions; another send attempt might succeed. + */ + private static final Collection> RETRY_EXCEPTIONS = Collections.singleton( + UnavailableException.class); + + /** + * Exceptions provided by client library might include a nested exception that indicates a transient condition, + * so the entire exception chain should be checked. + */ + private static boolean isRetryException(final Throwable t) { + if (t == null) { + return false; + } else if (RETRY_EXCEPTIONS.contains(t.getClass())) { + return true; + } else { + final Throwable cause = t.getCause(); + if (t.equals(cause)) { + return false; + } else { + return isRetryException(cause); + } + } + } + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java new file mode 100644 index 0000000000..9ff30e72f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/MessageDerivationStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.publish; + +import org.apache.nifi.components.DescribedValue; + +/** + * Strategy for publishing data to GCP via PublishGCPubSub processor. + */ +public enum MessageDerivationStrategy implements DescribedValue { + FLOWFILE_ORIENTED("FlowFile Oriented", + "Each incoming FlowFile is sent as a Google Cloud PubSub message"), + RECORD_ORIENTED("Record Oriented", + "Each incoming FlowFile is parsed into NiFi records, which are each sent as a Google Cloud PubSub message"); + + private final String displayName; + + private final String description; + + MessageDerivationStrategy(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java new file mode 100644 index 0000000000..f4b9be8f5f --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/TrackedApiFutureCallback.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.publish; + +import com.google.api.core.ApiFutureCallback; + +import java.util.List; + +/** + * Specialization of {@link ApiFutureCallback} used to track Google PubSub send results. Failure + * exceptions are captured to facilitate FlowFile routing decisions. + */ +public class TrackedApiFutureCallback implements ApiFutureCallback { + private final List successes; + private final List failures; + + public TrackedApiFutureCallback(final List successes, final List failures) { + this.successes = successes; + this.failures = failures; + } + + @Override + public void onFailure(final Throwable t) { + failures.add(t); + } + + @Override + public void onSuccess(final String result) { + successes.add(result); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java index 0d7a1da562..072d8a94c8 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSubIT.java @@ -42,7 +42,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{ runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); - runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "10"); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "10"); runner.assertValid(); @@ -64,7 +64,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{ runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); - runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2"); runner.assertValid(); @@ -88,7 +88,7 @@ public class ConsumeGCPubSubIT extends AbstractGCPubSubIT{ runner.setProperty(ConsumeGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(ConsumeGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); runner.setProperty(ConsumeGCPubSub.SUBSCRIPTION, subscription); - runner.setProperty(ConsumeGCPubSub.BATCH_SIZE, "2"); + runner.setProperty(ConsumeGCPubSub.BATCH_SIZE_THRESHOLD, "2"); runner.assertValid(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java index 145eb2b21a..3e6de5b657 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubIT.java @@ -40,7 +40,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); - runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); + runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1"); runner.assertValid(); @@ -61,7 +61,7 @@ public class PublishGCPubSubIT extends AbstractGCPubSubIT{ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT_ID); runner.setProperty(PublishGCPubSub.TOPIC_NAME, topic); runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); - runner.setProperty(PublishGCPubSub.BATCH_SIZE, "1"); + runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "1"); runner.assertValid(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java new file mode 100644 index 0000000000..2870942834 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.UnavailableException; +import com.google.cloud.pubsub.v1.Publisher; +import io.grpc.Status; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy; +import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PublishGCPubSubTest { + + private static final String TOPIC = "my-topic"; + private static final String PROJECT = "my-project"; + + private Throwable throwable; + private Publisher publisherMock; + private TestRunner runner; + + @BeforeEach + void setRunner() { + throwable = null; + publisherMock = mock(Publisher.class); + runner = TestRunners.newTestRunner(new PublishGCPubSub() { + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + publisher = publisherMock; + } + + @Override + protected void addCallback(ApiFuture apiFuture, ApiFutureCallback callback, Executor executor) { + if (callback instanceof TrackedApiFutureCallback) { + final TrackedApiFutureCallback apiFutureCallback = (TrackedApiFutureCallback) callback; + if (throwable == null) { + apiFutureCallback.onSuccess(Long.toString(System.currentTimeMillis())); + } else { + apiFutureCallback.onFailure(throwable); + } + } + } + }); + } + + @Test + void testPropertyDescriptors() throws InitializationException { + runner.assertNotValid(); + + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.assertNotValid(); + + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.assertNotValid(); + + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.assertValid(); + + runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSub.API_ENDPOINT, "localhost:443"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "-1"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSub.BATCH_SIZE_THRESHOLD, "15"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSub.BATCH_BYTES_THRESHOLD, "3 MB"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSub.BATCH_DELAY_THRESHOLD, "100 millis"); + runner.assertValid(); + } + + @Test + void testSendOneSuccessFlowFileStrategy() throws InitializationException { + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + + runner.enqueue("text"); + runner.run(1); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next(); + assertNotNull(flowFile.getAttribute(PubSubAttributes.MESSAGE_ID_ATTRIBUTE)); + } + + @Test + void testSendOneRetryFlowFileStrategy() throws InitializationException { + throwable = new UnavailableException(null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true); + + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + + runner.enqueue("text"); + runner.run(1); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1); + } + + + @Test + void testSendOneFailureFlowFileStrategy() throws InitializationException { + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.setProperty(PublishGCPubSub.MAX_MESSAGE_SIZE, "16 B"); + runner.enqueue("some really long text"); + + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1); + } + + @Test + void testSendOneSuccessRecordStrategy() throws InitializationException, IOException { + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); + runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); + + runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull( + getClass().getClassLoader().getResource("pubsub/records.json")))); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next(); + assertEquals("3", flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE)); + } + + @Test + void testSendOneRetryRecordStrategy() throws InitializationException, IOException { + throwable = new UnavailableException(null, GrpcStatusCode.of(Status.Code.UNAVAILABLE), true); + + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); + runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); + + runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull( + getClass().getClassLoader().getResource("pubsub/records.json")))); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_RETRY, 1); + } + + @Test + void testSendOneFailureRecordStrategy() throws InitializationException, IOException { + throwable = new IllegalStateException("testSendOne_Failure_RecordStrategy"); + + runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, getCredentialsServiceId(runner)); + runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC); + runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT); + runner.setProperty(PublishGCPubSub.RECORD_READER, getReaderServiceId(runner)); + runner.setProperty(PublishGCPubSub.RECORD_WRITER, getWriterServiceId(runner)); + runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue()); + + runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull( + getClass().getClassLoader().getResource("pubsub/records.json")))); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1); + } + + private static String getCredentialsServiceId(final TestRunner runner) throws InitializationException { + final ControllerService controllerService = mock(GCPCredentialsControllerService.class); + final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName(); + when(controllerService.getIdentifier()).thenReturn(controllerServiceId); + runner.addControllerService(controllerServiceId, controllerService); + runner.enableControllerService(controllerService); + return controllerServiceId; + } + + private static String getReaderServiceId(TestRunner runner) throws InitializationException { + final ControllerService readerService = new JsonTreeReader(); + final String readerServiceId = readerService.getClass().getName(); + runner.addControllerService(readerServiceId, readerService); + runner.enableControllerService(readerService); + return readerServiceId; + } + + + private static String getWriterServiceId(TestRunner runner) throws InitializationException { + final ControllerService writerService = new JsonRecordSetWriter(); + final String writerServiceId = writerService.getClass().getName(); + runner.addControllerService(writerServiceId, writerService); + runner.enableControllerService(writerService); + return writerServiceId; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java new file mode 100644 index 0000000000..9204fc6aff --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLiteTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub.lite; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PublishGCPubSubLiteTest { + + private TestRunner runner; + + @BeforeEach + void setRunner() { + runner = TestRunners.newTestRunner(PublishGCPubSubLite.class); + } + + @Test + void testPropertyDescriptors() throws InitializationException { + runner.assertNotValid(); + + final ControllerService controllerService = new GCPCredentialsControllerService(); + final String controllerServiceId = GCPCredentialsControllerService.class.getSimpleName(); + runner.addControllerService(controllerServiceId, controllerService); + runner.enableControllerService(controllerService); + runner.setProperty(PublishGCPubSubLite.GCP_CREDENTIALS_PROVIDER_SERVICE, controllerServiceId); + runner.assertNotValid(); + + runner.setProperty(PublishGCPubSubLite.TOPIC_NAME, "projects/my-project/locations/my-location/topics/my-topic"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "-1"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSubLite.BATCH_SIZE_THRESHOLD, "15"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSubLite.BATCH_BYTES_THRESHOLD, "3 MB"); + runner.assertValid(); + + runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100"); + runner.assertNotValid(); + runner.setProperty(PublishGCPubSubLite.BATCH_DELAY_THRESHOLD, "100 millis"); + runner.assertValid(); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json new file mode 100644 index 0000000000..08ae479321 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.json @@ -0,0 +1,17 @@ +[ + { + "name": "Acme1", + "address": "1234 First Street", + "zip": "12345" + }, + { + "name": "Acme2", + "address": "1234 Second Street", + "zip": "12345" + }, + { + "name": "Acme3", + "address": "1234 Third Street", + "zip": "12345" + } +] \ No newline at end of file