diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 79198f04c7..cf0e7373ba 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -31,13 +31,8 @@ com.microsoft.azure azure-eventhubs - 0.9.0 + 0.14.2 - com.microsoft.azure azure-storage diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 69d558640b..c40707e8f4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -338,12 +338,12 @@ public class GetAzureEventHub extends AbstractProcessor { final Map attributes = new HashMap<>(); FlowFile flowFile = session.create(); - EventData.SystemProperties systemProperties = eventData.getSystemProperties(); + final EventData.SystemProperties systemProperties = eventData.getSystemProperties(); if (null != systemProperties) { - attributes.put("eventhub.enqueued.timestamp", String.valueOf(eventData.getSystemProperties().getEnqueuedTime())); - attributes.put("eventhub.offset", eventData.getSystemProperties().getOffset()); - attributes.put("eventhub.sequence", String.valueOf(eventData.getSystemProperties().getSequenceNumber())); + attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime())); + attributes.put("eventhub.offset", systemProperties.getOffset()); + attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber())); } attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); @@ -352,10 +352,9 @@ public class GetAzureEventHub extends AbstractProcessor { flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.write(flowFile, out -> { - out.write(eventData.getBody()); + out.write(eventData.getBytes()); }); - session.transfer(flowFile, REL_SUCCESS); final String namespace = context.getProperty(NAMESPACE).getValue();