From 98af3dc4cd56da324bb4572aab2dc395338ed45d Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 8 Nov 2017 23:20:58 +0900 Subject: [PATCH] NIFI-4595: This closes #2264. Add ConsumeAzureEventHub. Signed-off-by: joewitt --- .../src/main/resources/META-INF/LICENSE | 6 +- .../src/main/resources/META-INF/NOTICE | 21 + .../nifi-azure-processors/pom.xml | 18 +- .../azure/eventhub/ConsumeAzureEventHub.java | 654 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../eventhub/TestConsumeAzureEventHub.java | 397 +++++++++++ 6 files changed, 1091 insertions(+), 6 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE index c527bf72c5..a821684d56 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE @@ -206,11 +206,7 @@ The binary distribution of this product bundles source from 'Microsoft Azure IoT Device Libraries'. The source is available under an MIT LICENSE. - Copyright (c) Microsoft Corporation - - All rights reserved. - - MIT License + Copyright (c) 2016 Microsoft Azure Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE index 09107c153a..25452fbbe8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE @@ -23,6 +23,27 @@ The following binary components are provided under the Apache Software License v Apache Commons IO Copyright 2002-2016 The Apache Software Foundation + (ASLv2) Google GSON + The following NOTICE information applies: + Copyright 2008 Google Inc. + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + (ASLv2) Jackson JSON processor The following NOTICE information applies: # Jackson JSON processor diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index e43bfe05ac..49d0afd885 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -19,6 +19,9 @@ nifi-azure-processors jar + + 0.14.4 + org.apache.nifi @@ -28,10 +31,23 @@ org.apache.nifi nifi-utils + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + com.microsoft.azure azure-eventhubs - 0.14.2 + ${azure-eventhubs.version} + + + com.microsoft.azure + azure-eventhubs-eph + ${azure-eventhubs.version} com.microsoft.azure diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java new file mode 100644 index 0000000000..dc180975f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventprocessorhost.CloseReason; +import com.microsoft.azure.eventprocessorhost.EventProcessorHost; +import com.microsoft.azure.eventprocessorhost.EventProcessorOptions; +import com.microsoft.azure.eventprocessorhost.IEventProcessor; +import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory; +import com.microsoft.azure.eventprocessorhost.PartitionContext; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ReceiverDisconnectedException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import static org.apache.nifi.util.StringUtils.isEmpty; + +@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) +@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@TriggerSerially +@WritesAttributes({ + @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), + @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), + @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), + @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") +}) +public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { + + static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("event-hub-namespace") + .displayName("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("event-hub-name") + .displayName("Event Hub Name") + .description("The name of the Azure Event Hub to pull messages from.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + // TODO: Do we need to support custom service endpoints as GetAzureEventHub does? Is it possible? + static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder() + .name("event-hub-shared-access-policy-name") + .displayName("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("event-hub-shared-access-policy-primary-key") + .displayName("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .sensitive(true) + .required(true) + .build(); + static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() + .name("event-hub-consumer-group") + .displayName("Event Hub Consumer Group") + .description("The name of the Event Hub Consumer Group to use.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("$Default") + .required(true) + .build(); + static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder() + .name("event-hub-consumer-hostname") + .displayName("Event Hub Consumer Hostname") + .description("The hostname of this Event Hub Consumer instance." + + " If not specified, an unique identifier is generated in 'nifi-' format.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for reading received messages." + + " The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema.") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use for serializing Records to an output FlowFile." + + " The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema." + + " If not specified, each message will create a FlowFile.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(false) + .build(); + + static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue( + "start-of-stream", "Start of stream", "Read from the oldest message retained in the stream."); + static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue( + "end-of-stream", "End of stream", + "Ignore old retained messages even if exist, start reading new ones from now."); + static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder() + .name("event-hub-initial-offset") + .displayName("Initial Offset") + .description("Specify where to start receiving messages if offset is not stored in Azure Storage.") + .required(true) + .allowableValues(INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM) + .defaultValue(INITIAL_OFFSET_END_OF_STREAM.getValue()) + .build(); + static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder() + .name("event-hub-prefetch-count") + .displayName("Prefetch Count") + .defaultValue("The number of messages to fetch from Event Hub before processing." + + " This parameter affects throughput." + + " The more prefetch count, the better throughput in general, but consumes more resources (RAM)." + + " NOTE: Even though Event Hub client API provides this option," + + " actual number of messages can be pre-fetched is depend on the Event Hub server implementation." + + " It is reported that only one event is received at a time in certain situation." + + " https://github.com/Azure/azure-event-hubs-java/issues/125") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("300") + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("event-hub-batch-size") + .displayName("Batch Size") + .description("The number of messages to process within a NiFi session." + + " This parameter affects throughput and consistency." + + " NiFi commits its session and Event Hub checkpoint after processing this number of messages." + + " If NiFi session is committed, but failed to create an Event Hub checkpoint," + + " then it is possible that the same messages to be retrieved again." + + " The higher number, the higher throughput, but possibly less consistent.") + .defaultValue("10") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder() + .name("event-hub-message-receive-timeout") + .displayName("Message Receive Timeout") + .description("The amount of time this consumer should wait to receive the Prefetch Count before returning.") + .defaultValue("1 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder() + .name("storage-account-name") + .displayName("Storage Account Name") + .description("Name of the Azure Storage account to store Event Hub Consumer Group state.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder() + .name("storage-account-key") + .displayName("Storage Account Key") + .description("The Azure Storage account key to store Event Hub Consumer Group state.") + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder() + .name("storage-container-name") + .displayName("Storage Container Name") + .description("Name of the Azure Storage Container to store Event Hub Consumer Group state." + + " If not specified, Event Hub name is used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles received from Event Hub.") + .build(); + + static final Relationship REL_PARSE_FAILURE = new Relationship.Builder() + .name("parse.failure") + .description("If a message from Event Hub cannot be parsed using the configured Record Reader" + + " or failed to be written by the configured Record Writer," + + " the contents of the message will be routed to this Relationship as its own individual FlowFile.") + .build(); + + private static final Set RELATIONSHIPS; + private static final Set RECORD_RELATIONSHIPS; + private static final List PROPERTIES; + + static { + PROPERTIES = Collections.unmodifiableList(Arrays.asList( + NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME, + RECORD_READER, RECORD_WRITER, + INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, + STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME + )); + + Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); + + relationships.add(REL_PARSE_FAILURE); + RECORD_RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + private volatile EventProcessorHost eventProcessorHost; + private volatile ProcessSessionFactory processSessionFactory; + private volatile RecordReaderFactory readerFactory; + private volatile RecordSetWriterFactory writerFactory; + // The namespace name can not be retrieved from a PartitionContext at EventProcessor.onEvents, so keep it here. + private volatile String namespaceName; + private volatile boolean isRecordReaderSet = false; + private volatile boolean isRecordWriterSet = false; + + /** + * For unit test. + */ + void setProcessSessionFactory(ProcessSessionFactory processSessionFactory) { + this.processSessionFactory = processSessionFactory; + } + + /** + * For unit test. + */ + void setNamespaceName(String namespaceName) { + this.namespaceName = namespaceName; + } + + /** + * For unit test. + */ + public void setReaderFactory(RecordReaderFactory readerFactory) { + this.readerFactory = readerFactory; + } + + /** + * For unit test. + */ + public void setWriterFactory(RecordSetWriterFactory writerFactory) { + this.writerFactory = writerFactory; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return isRecordReaderSet && isRecordWriterSet ? RECORD_RELATIONSHIPS : RELATIONSHIPS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(); + final ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService(); + final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService(); + if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) { + results.add(new ValidationResult.Builder() + .subject("Record Reader and Writer") + .explanation(String.format("Both %s and %s should be set in order to write FlowFiles as Records.", + RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName())) + .valid(false) + .build()); + } + return results; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (RECORD_READER.equals(descriptor)) { + isRecordReaderSet = !StringUtils.isEmpty(newValue); + } else if (RECORD_WRITER.equals(descriptor)) { + isRecordWriterSet = !StringUtils.isEmpty(newValue); + } + } + + public class EventProcessorFactory implements IEventProcessorFactory { + @Override + public IEventProcessor createEventProcessor(PartitionContext context) throws Exception { + final EventProcessor eventProcessor = new EventProcessor(); + return eventProcessor; + } + } + + public class EventProcessor implements IEventProcessor { + + @Override + public void onOpen(PartitionContext context) throws Exception { + getLogger().info("Consumer group {} opened partition {} of {}", + new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath()}); + } + + @Override + public void onClose(PartitionContext context, CloseReason reason) throws Exception { + getLogger().info("Consumer group {} closed partition {} of {}. reason={}", + new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath(), reason}); + } + + @Override + public void onEvents(PartitionContext context, Iterable messages) throws Exception { + final ProcessSession session = processSessionFactory.createSession(); + + try { + + final StopWatch stopWatch = new StopWatch(true); + + if (readerFactory != null && writerFactory != null) { + writeRecords(context, messages, session, stopWatch); + } else { + writeFlowFiles(context, messages, session, stopWatch); + } + + // Commit NiFi first. + session.commit(); + // If creating an Event Hub checkpoint failed, then the same message can be retrieved again. + context.checkpoint(); + + } catch (Exception e) { + getLogger().error("Unable to fully process received message due to " + e, e); + // FlowFiles those are already committed will not get rollback. + session.rollback(); + } + + } + + private void putEventHubAttributes(Map attributes, String eventHubName, String partitionId, EventData eventData) { + final EventData.SystemProperties systemProperties = eventData.getSystemProperties(); + if (null != systemProperties) { + attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime())); + attributes.put("eventhub.offset", systemProperties.getOffset()); + attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber())); + } + + attributes.put("eventhub.name", eventHubName); + attributes.put("eventhub.partition", partitionId); + } + + private void writeFlowFiles(PartitionContext context, Iterable messages, ProcessSession session, StopWatch stopWatch) { + final String eventHubName = context.getEventHubPath(); + final String partitionId = context.getPartitionId(); + final String consumerGroup = context.getConsumerGroupName(); + messages.forEach(eventData -> { + FlowFile flowFile = session.create(); + + final Map attributes = new HashMap<>(); + putEventHubAttributes(attributes, eventHubName, partitionId, eventData); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> { + out.write(eventData.getBytes()); + }); + + transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile); + }); + } + + private void transferTo(Relationship relationship, ProcessSession session, StopWatch stopWatch, + String eventHubName, String partitionId, String consumerGroup, FlowFile flowFile) { + session.transfer(flowFile, relationship); + final String transitUri = "amqps://" + namespaceName + ".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } + + private void writeRecords(PartitionContext context, Iterable messages, ProcessSession session, StopWatch stopWatch) + throws SchemaNotFoundException, IOException { + + final String eventHubName = context.getEventHubPath(); + final String partitionId = context.getPartitionId(); + final String consumerGroup = context.getConsumerGroupName(); + final Map schemaRetrievalVariables = new HashMap<>(); + schemaRetrievalVariables.put("eventhub.name", eventHubName); + + final ComponentLog logger = getLogger(); + FlowFile flowFile = session.create(); + final Map attributes = new HashMap<>(); + + RecordSetWriter writer = null; + EventData lastEventData = null; + WriteResult lastWriteResult = null; + int recordCount = 0; + + try (final OutputStream out = session.write(flowFile)) { + for (final EventData eventData : messages) { + + try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) { + final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger); + + Record record; + while ((record = reader.nextRecord()) != null) { + + if (writer == null) { + // Initialize the writer when the first record is read. + final RecordSchema readerSchema = record.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(schemaRetrievalVariables, readerSchema); + writer = writerFactory.createWriter(logger, writeSchema, out); + writer.beginRecordSet(); + } + + lastWriteResult = writer.write(record); + recordCount += lastWriteResult.getRecordCount(); + } + + lastEventData = eventData; + + } catch (Exception e) { + // Write it to the parse failure relationship. + logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer due to " + e, e); + FlowFile failed = session.create(); + session.write(failed, o -> o.write(eventData.getBytes())); + putEventHubAttributes(attributes, eventHubName, partitionId, eventData); + failed = session.putAllAttributes(failed, attributes); + transferTo(REL_PARSE_FAILURE, session, stopWatch, eventHubName, partitionId, consumerGroup, failed); + } + } + + if (lastEventData != null) { + putEventHubAttributes(attributes, eventHubName, partitionId, lastEventData); + + attributes.put("record.count", String.valueOf(recordCount)); + if (writer != null) { + writer.finishRecordSet(); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + if (lastWriteResult != null) { + attributes.putAll(lastWriteResult.getAttributes()); + } + + try { + writer.close(); + } catch (IOException e) { + logger.warn("Failed to close Record Writer due to {}" + e, e); + } + } + } + } + + // This part has to be outside of 'session.write(flowFile)' code block. + if (lastEventData != null) { + flowFile = session.putAllAttributes(flowFile, attributes); + transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile); + } else { + // If there's no successful event data, then remove the FlowFile. + session.remove(flowFile); + } + } + + @Override + public void onError(PartitionContext context, Throwable e) { + if (e instanceof ReceiverDisconnectedException && e.getMessage().startsWith("New receiver with higher epoch of ")) { + // This is a known behavior in a NiFi cluster where multiple nodes consumes from the same Event Hub. + // Once another node connects, some partitions are given to that node to distribute consumer load. + // When that happens, this exception is thrown. + getLogger().info("New receiver took over partition {} of Azure Event Hub {}, consumerGroupName={}, message={}", + new Object[]{context.getPartitionId(), context.getEventHubPath(), context.getConsumerGroupName(), e.getMessage()}); + return; + } + getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at partition {}," + + " consumerGroupName={}, exception={}", + new Object[]{context.getEventHubPath(), context.getPartitionId(), context.getConsumerGroupName(), e}, e); + } + + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + + if (eventProcessorHost == null) { + try { + registerEventProcessor(context); + } catch (IllegalArgumentException e) { + // In order to show simple error message without wrapping it by another ProcessException, just throw it as it is. + throw e; + } catch (Exception e) { + throw new ProcessException("Failed to register the event processor due to " + e, e); + } + processSessionFactory = sessionFactory; + + readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + } + + // After a EventProcessor is registered successfully, nothing has to be done at onTrigger + // because new sessions are created when new messages are arrived by the EventProcessor. + context.yield(); + } + + @OnStopped + public void unregisterEventProcessor(final ProcessContext context) { + if (eventProcessorHost != null) { + try { + eventProcessorHost.unregisterEventProcessor(); + eventProcessorHost = null; + processSessionFactory = null; + readerFactory = null; + writerFactory = null; + } catch (Exception e) { + throw new RuntimeException("Failed to unregister the event processor due to " + e, e); + } + } + } + + private void registerEventProcessor(final ProcessContext context) throws Exception { + // Validate required properties. + final String consumerGroupName = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(CONSUMER_GROUP, consumerGroupName); + + namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(NAMESPACE, namespaceName); + + final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(EVENT_HUB_NAME, eventHubName); + + final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(ACCESS_POLICY_NAME, sasName); + + final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey); + + final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName); + + final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey); + + + final String consumerHostname = orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(), + EventProcessorHost.createHostName("nifi")); + + final String containerName = orDefault(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), + eventHubName); + + + final EventProcessorOptions options = new EventProcessorOptions(); + final String initialOffset = context.getProperty(INITIAL_OFFSET).getValue(); + if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) { + options.setInitialOffsetProvider(options.new StartOfStreamInitialOffsetProvider()); + } else if (INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)){ + options.setInitialOffsetProvider(options.new EndOfStreamInitialOffsetProvider()); + } else { + throw new IllegalArgumentException("Initial offset " + initialOffset + " is not allowed."); + } + + final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger(); + if (prefetchCount != null && prefetchCount > 0) { + options.setPrefetchCount(prefetchCount); + } + + final Integer batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + if (batchSize != null && batchSize > 0) { + options.setMaxBatchSize(batchSize); + } + + final Long receiveTimeoutMillis = context.getProperty(RECEIVE_TIMEOUT) + .evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis)); + + final String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, storageAccountName, storageAccountKey); + + final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey); + + eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName); + + options.setExceptionNotification(e -> { + getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" + + " at consumer group {} and partition {}, action={}, hostname={}, exception={}", + new Object[]{eventHubName, consumerGroupName, e.getPartitionId(), e.getAction(), e.getHostname()}, e.getException()); + }); + + + eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get(); + } + + private String orDefault(String value, String defaultValue) { + return isEmpty(value) ? defaultValue : value; + } + + private void validateRequiredProperty(PropertyDescriptor property, String value) { + if (isEmpty(value)) { + throw new IllegalArgumentException(String.format("'%s' is required, but not specified.", property.getDisplayName())); + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 84b3300f4c..073d6745ed 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,6 +14,7 @@ # limitations under the License. org.apache.nifi.processors.azure.eventhub.PutAzureEventHub org.apache.nifi.processors.azure.eventhub.GetAzureEventHub +org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage org.apache.nifi.processors.azure.storage.ListAzureBlobStorage org.apache.nifi.processors.azure.storage.PutAzureBlobStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java new file mode 100644 index 0000000000..1f54f392b9 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventprocessorhost.PartitionContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.SharedSessionState; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestConsumeAzureEventHub { + + private ConsumeAzureEventHub.EventProcessor eventProcessor; + private MockProcessSession processSession; + private SharedSessionState sharedState; + private PartitionContext partitionContext; + private ConsumeAzureEventHub processor; + + @Before + public void setupProcessor() { + processor = new ConsumeAzureEventHub(); + final ProcessorInitializationContext initContext = Mockito.mock(ProcessorInitializationContext.class); + final String componentId = "componentId"; + when(initContext.getIdentifier()).thenReturn(componentId); + MockComponentLog componentLog = new MockComponentLog(componentId, processor); + when(initContext.getLogger()).thenReturn(componentLog); + processor.initialize(initContext); + + final ProcessSessionFactory processSessionFactory = Mockito.mock(ProcessSessionFactory.class); + processor.setProcessSessionFactory(processSessionFactory); + processor.setNamespaceName("namespace"); + + sharedState = new SharedSessionState(processor, new AtomicLong(0)); + processSession = new MockProcessSession(sharedState, processor); + when(processSessionFactory.createSession()).thenReturn(processSession); + + eventProcessor = processor.new EventProcessor(); + + partitionContext = Mockito.mock(PartitionContext.class); + when(partitionContext.getEventHubPath()).thenReturn("eventhub-name"); + when(partitionContext.getPartitionId()).thenReturn("partition-id"); + when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group"); + } + + @Test + public void testReceiveOne() throws Exception { + + final Iterable eventDataList = Arrays.asList(new EventData("one".getBytes(StandardCharsets.UTF_8))); + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile msg1 = flowFiles.get(0); + msg1.assertContentEquals("one"); + msg1.assertAttributeEquals("eventhub.name", "eventhub-name"); + msg1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + } + + + @Test + public void testReceiveTwo() throws Exception { + + final Iterable eventDataList = Arrays.asList( + new EventData("one".getBytes(StandardCharsets.UTF_8)), + new EventData("two".getBytes(StandardCharsets.UTF_8)) + ); + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(2, flowFiles.size()); + final MockFlowFile msg1 = flowFiles.get(0); + msg1.assertContentEquals("one"); + final MockFlowFile msg2 = flowFiles.get(1); + msg2.assertContentEquals("two"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + } + + @Test + public void testCheckpointFailure() throws Exception { + + final Iterable eventDataList = Arrays.asList( + new EventData("one".getBytes(StandardCharsets.UTF_8)), + new EventData("two".getBytes(StandardCharsets.UTF_8)) + ); + doThrow(new RuntimeException("Failed to create a checkpoint.")).when(partitionContext).checkpoint(); + eventProcessor.onEvents(partitionContext, eventDataList); + + // Even if it fails to create a checkpoint, these FlowFiles are already committed. + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(2, flowFiles.size()); + final MockFlowFile msg1 = flowFiles.get(0); + msg1.assertContentEquals("one"); + final MockFlowFile msg2 = flowFiles.get(1); + msg2.assertContentEquals("two"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + } + + private Record toRecord(String value) { + Map map = new HashMap<>(); + map.put("value", value); + return new MapRecord(new SimpleRecordSchema(Collections.singletonList( + new RecordField("value", RecordFieldType.STRING.getDataType()))), map); + } + + private void setupRecordWriter() throws SchemaNotFoundException, IOException { + setupRecordWriter(null); + } + + private void setupRecordWriter(String throwErrorWith) throws SchemaNotFoundException, IOException { + final RecordSetWriterFactory writerFactory = mock(RecordSetWriterFactory.class); + processor.setWriterFactory(writerFactory); + final RecordSetWriter writer = mock(RecordSetWriter.class); + final AtomicReference outRef = new AtomicReference<>(); + when(writerFactory.createWriter(any(), any(), any())).thenAnswer(invocation -> { + outRef.set(invocation.getArgumentAt(2, OutputStream.class)); + return writer; + }); + when(writer.write(any(Record.class))).thenAnswer(invocation -> { + final String value = (String) invocation.getArgumentAt(0, Record.class).getValue("value"); + if (throwErrorWith != null && throwErrorWith.equals(value)) { + throw new IOException("Simulating record write failure."); + } + outRef.get().write(value.getBytes(StandardCharsets.UTF_8)); + return WriteResult.of(1, Collections.emptyMap()); + }); + } + + private void setupRecordReader(List eventDataList) throws MalformedRecordException, IOException, SchemaNotFoundException { + setupRecordReader(eventDataList, -1, null); + } + + private void setupRecordReader(List eventDataList, int throwExceptionAt, String writeFailureWith) + throws MalformedRecordException, IOException, SchemaNotFoundException { + final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); + processor.setReaderFactory(readerFactory); + final RecordReader reader = mock(RecordReader.class); + when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader); + final List recordList = eventDataList.stream() + .map(eventData -> toRecord(new String(eventData.getBytes()))) + .collect(Collectors.toList()); + + // Add null to indicate the end of records. + final Function, List> addEndRecord = rs -> rs.stream() + // If the record is simulated to throw an exception when writing, do not add a null record to avoid messing up indices. + .flatMap(r -> r.getAsString("value").equals(writeFailureWith) ? Stream.of(r) : Stream.of(r, null)) + .collect(Collectors.toList()); + + final List recordSetList = addEndRecord.apply(recordList); + final Record[] records = recordSetList.toArray(new Record[recordSetList.size()]); + + switch (throwExceptionAt) { + case -1: + when(reader.nextRecord()) + .thenReturn(records[0], Arrays.copyOfRange(records, 1, records.length)); + break; + case 0: + when(reader.nextRecord()) + .thenThrow(new MalformedRecordException("Simulating Record parse failure.")) + .thenReturn(records[0], Arrays.copyOfRange(records, 1, records.length)); + break; + default: + final List recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt)); + final List recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size())); + final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]); + final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]); + when(reader.nextRecord()) + .thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length)) + .thenThrow(new MalformedRecordException("Simulating Record parse failure.")) + .thenReturn(records2[0], Arrays.copyOfRange(records2, 1, records2.length)); + } + + } + + @Test + public void testReceiveRecords() throws Exception { + + final List eventDataList = Arrays.asList( + new EventData("one".getBytes(StandardCharsets.UTF_8)), + new EventData("two".getBytes(StandardCharsets.UTF_8)) + ); + + setupRecordReader(eventDataList); + + setupRecordWriter(); + + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile ff1 = flowFiles.get(0); + ff1.assertContentEquals("onetwo"); + ff1.assertAttributeEquals("eventhub.name", "eventhub-name"); + ff1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + } + + @Test + public void testReceiveRecordReaderFailure() throws Exception { + + final List eventDataList = Arrays.asList( + new EventData("one".getBytes(StandardCharsets.UTF_8)), + new EventData("two".getBytes(StandardCharsets.UTF_8)), + new EventData("three".getBytes(StandardCharsets.UTF_8)), + new EventData("four".getBytes(StandardCharsets.UTF_8)) + ); + + setupRecordReader(eventDataList, 2, null); + + setupRecordWriter(); + + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile ff1 = flowFiles.get(0); + ff1.assertContentEquals("onetwofour"); + ff1.assertAttributeEquals("eventhub.name", "eventhub-name"); + ff1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE); + assertEquals(1, failedFFs.size()); + final MockFlowFile failed1 = failedFFs.get(0); + failed1.assertContentEquals("three"); + failed1.assertAttributeEquals("eventhub.name", "eventhub-name"); + failed1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + + final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + + final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri()); + } + + @Test + public void testReceiveAllRecordFailure() throws Exception { + + final List eventDataList = Collections.singletonList( + new EventData("one".getBytes(StandardCharsets.UTF_8)) + ); + + setupRecordReader(eventDataList, 0, null); + + setupRecordWriter(); + + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(0, flowFiles.size()); + + final List failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE); + assertEquals(1, failedFFs.size()); + final MockFlowFile failed1 = failedFFs.get(0); + failed1.assertContentEquals("one"); + failed1.assertAttributeEquals("eventhub.name", "eventhub-name"); + failed1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + + final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + + } + + @Test + public void testReceiveRecordWriterFailure() throws Exception { + + final List eventDataList = Arrays.asList( + new EventData("one".getBytes(StandardCharsets.UTF_8)), + new EventData("two".getBytes(StandardCharsets.UTF_8)), + new EventData("three".getBytes(StandardCharsets.UTF_8)), + new EventData("four".getBytes(StandardCharsets.UTF_8)) + ); + + setupRecordReader(eventDataList, -1, "two"); + + setupRecordWriter("two"); + + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile ff1 = flowFiles.get(0); + ff1.assertContentEquals("onethreefour"); + ff1.assertAttributeEquals("eventhub.name", "eventhub-name"); + ff1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE); + assertEquals(1, failedFFs.size()); + final MockFlowFile failed1 = failedFFs.get(0); + failed1.assertContentEquals("two"); + failed1.assertAttributeEquals("eventhub.name", "eventhub-name"); + failed1.assertAttributeEquals("eventhub.partition", "partition-id"); + + final List provenanceEvents = sharedState.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + + final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri()); + + final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1); + assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType()); + assertEquals("amqps://namespace.servicebus.windows.net/" + + "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri()); + } + +}