diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 2837f3d080..253a43d9aa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -20,8 +20,8 @@ nifi-azure-processors jar - 3.1.1 - 3.1.1 + 3.2.1 + 3.2.1 diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 7873e4c607..d01aa3455b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -86,7 +86,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"), - @WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled") + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'") }) public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { @@ -401,6 +402,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber())); } + final Map applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData); + attributes.putAll(applicationProperties); + attributes.put("eventhub.name", eventHubName); attributes.put("eventhub.partition", partitionId); } @@ -414,8 +418,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final Map attributes = new HashMap<>(); putEventHubAttributes(attributes, eventHubName, partitionId, eventData); - flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> { out.write(eventData.getBytes()); }); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 78a33270c2..f40f668e92 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -77,7 +77,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"), - @WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled") + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'") }) public class GetAzureEventHub extends AbstractProcessor { static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() @@ -375,6 +376,9 @@ public class GetAzureEventHub extends AbstractProcessor { attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber())); } + final Map applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData); + attributes.putAll(applicationProperties); + attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); attributes.put("eventhub.partition", partitionId); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java index d95e27f22c..f71f6c8495 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -18,8 +18,11 @@ package org.apache.nifi.processors.azure.eventhub.utils; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventData; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -87,4 +90,17 @@ public final class AzureEventHubUtils { .setSasKeyName(sasName) .setSasKey(sasKey).toString(); } + + public static Map getApplicationProperties(EventData eventData) { + final Map properties = new HashMap<>(); + + final Map applicationProperties = eventData.getProperties(); + if (null != applicationProperties) { + for (Map.Entry property : applicationProperties.entrySet()) { + properties.put(String.format("eventhub.property.%s", property.getKey()), property.getValue().toString()); + } + } + + return properties; + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index 88bcfa8fed..b33bc6f4fd 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -144,6 +144,19 @@ public class GetAzureEventHubTest { testRunner.clearTransferState(); } + @Test + public void testNormalFlowWithApplicationProperties() throws Exception { + setUpStandardTestConfig(); + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi"); + flowFile.assertAttributeEquals("eventhub.property.application", "TestApp"); + + testRunner.clearTransferState(); + } + @Test public void testNormalNotReceivedEventsFlow() throws Exception { setUpStandardTestConfig(); @@ -193,6 +206,8 @@ public class GetAzureEventHubTest { final LinkedList receivedEvents = new LinkedList<>(); for(int i = 0; i < 10; i++){ EventData eventData = EventData.create(String.format("test event number: %d", i).getBytes()); + eventData.getProperties().put("event-sender", "Apache NiFi"); + eventData.getProperties().put("application", "TestApp"); if (received) { HashMap properties = new HashMap<>(); properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE); @@ -232,4 +247,4 @@ public class GetAzureEventHubTest { testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4"); testRunner.assertValid(); } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 99541a5509..17f844289e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -107,6 +107,7 @@ public class TestConsumeAzureEventHub { when(partitionContext.getPartitionId()).thenReturn("partition-id"); when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group"); } + @Test public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException { TestRunner testRunner = TestRunners.newTestRunner(processor); @@ -130,6 +131,23 @@ public class TestConsumeAzureEventHub { testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true"); testRunner.assertValid(); } + + @Test + public void testReceivedApplicationProperties() throws Exception { + final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8)); + singleEvent.getProperties().put("event-sender", "Apache NiFi"); + singleEvent.getProperties().put("application", "TestApp"); + final Iterable eventDataList = Arrays.asList(singleEvent); + eventProcessor.onEvents(partitionContext, eventDataList); + + processSession.assertCommitted(); + final List flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile msg1 = flowFiles.get(0); + msg1.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi"); + msg1.assertAttributeEquals("eventhub.property.application", "TestApp"); + } + @Test public void testReceiveOne() throws Exception { final Iterable eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));