mirror of https://github.com/apache/nifi.git
NIFI-7943 - Add application properties to GetAzureEventHub
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4617.
This commit is contained in:
parent
c8ea7523ef
commit
9a11d23c83
|
@ -20,8 +20,8 @@
|
||||||
<artifactId>nifi-azure-processors</artifactId>
|
<artifactId>nifi-azure-processors</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<properties>
|
<properties>
|
||||||
<azure-eventhubs.version>3.1.1</azure-eventhubs.version>
|
<azure-eventhubs.version>3.2.1</azure-eventhubs.version>
|
||||||
<azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version>
|
<azure-eventhubs-eph.version>3.2.1</azure-eventhubs-eph.version>
|
||||||
</properties>
|
</properties>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -86,7 +86,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
|
||||||
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
|
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
|
||||||
@WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"),
|
@WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"),
|
||||||
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
|
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
|
||||||
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled")
|
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"),
|
||||||
|
@WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")
|
||||||
})
|
})
|
||||||
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
|
@ -401,6 +402,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
||||||
attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
|
attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
|
||||||
|
attributes.putAll(applicationProperties);
|
||||||
|
|
||||||
attributes.put("eventhub.name", eventHubName);
|
attributes.put("eventhub.name", eventHubName);
|
||||||
attributes.put("eventhub.partition", partitionId);
|
attributes.put("eventhub.partition", partitionId);
|
||||||
}
|
}
|
||||||
|
@ -414,8 +418,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
|
putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
|
||||||
|
|
||||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
|
||||||
flowFile = session.write(flowFile, out -> {
|
flowFile = session.write(flowFile, out -> {
|
||||||
out.write(eventData.getBytes());
|
out.write(eventData.getBytes());
|
||||||
});
|
});
|
||||||
|
|
|
@ -77,7 +77,8 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
|
||||||
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
|
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
|
||||||
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence number associated with the message"),
|
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence number associated with the message"),
|
||||||
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
|
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
|
||||||
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled")
|
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled"),
|
||||||
|
@WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")
|
||||||
})
|
})
|
||||||
public class GetAzureEventHub extends AbstractProcessor {
|
public class GetAzureEventHub extends AbstractProcessor {
|
||||||
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
|
||||||
|
@ -375,6 +376,9 @@ public class GetAzureEventHub extends AbstractProcessor {
|
||||||
attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
|
attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData);
|
||||||
|
attributes.putAll(applicationProperties);
|
||||||
|
|
||||||
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
|
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
|
||||||
attributes.put("eventhub.partition", partitionId);
|
attributes.put("eventhub.partition", partitionId);
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,11 @@ package org.apache.nifi.processors.azure.eventhub.utils;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
||||||
|
import com.microsoft.azure.eventhubs.EventData;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
@ -87,4 +90,17 @@ public final class AzureEventHubUtils {
|
||||||
.setSasKeyName(sasName)
|
.setSasKeyName(sasName)
|
||||||
.setSasKey(sasKey).toString();
|
.setSasKey(sasKey).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Map<String, String> getApplicationProperties(EventData eventData) {
|
||||||
|
final Map<String, String> properties = new HashMap<>();
|
||||||
|
|
||||||
|
final Map<String,Object> applicationProperties = eventData.getProperties();
|
||||||
|
if (null != applicationProperties) {
|
||||||
|
for (Map.Entry<String, Object> property : applicationProperties.entrySet()) {
|
||||||
|
properties.put(String.format("eventhub.property.%s", property.getKey()), property.getValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,6 +144,19 @@ public class GetAzureEventHubTest {
|
||||||
testRunner.clearTransferState();
|
testRunner.clearTransferState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNormalFlowWithApplicationProperties() throws Exception {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.run(1, true);
|
||||||
|
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
|
||||||
|
|
||||||
|
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
|
||||||
|
flowFile.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi");
|
||||||
|
flowFile.assertAttributeEquals("eventhub.property.application", "TestApp");
|
||||||
|
|
||||||
|
testRunner.clearTransferState();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNormalNotReceivedEventsFlow() throws Exception {
|
public void testNormalNotReceivedEventsFlow() throws Exception {
|
||||||
setUpStandardTestConfig();
|
setUpStandardTestConfig();
|
||||||
|
@ -193,6 +206,8 @@ public class GetAzureEventHubTest {
|
||||||
final LinkedList<EventData> receivedEvents = new LinkedList<>();
|
final LinkedList<EventData> receivedEvents = new LinkedList<>();
|
||||||
for(int i = 0; i < 10; i++){
|
for(int i = 0; i < 10; i++){
|
||||||
EventData eventData = EventData.create(String.format("test event number: %d", i).getBytes());
|
EventData eventData = EventData.create(String.format("test event number: %d", i).getBytes());
|
||||||
|
eventData.getProperties().put("event-sender", "Apache NiFi");
|
||||||
|
eventData.getProperties().put("application", "TestApp");
|
||||||
if (received) {
|
if (received) {
|
||||||
HashMap<String, Object> properties = new HashMap<>();
|
HashMap<String, Object> properties = new HashMap<>();
|
||||||
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
|
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
|
||||||
|
|
|
@ -107,6 +107,7 @@ public class TestConsumeAzureEventHub {
|
||||||
when(partitionContext.getPartitionId()).thenReturn("partition-id");
|
when(partitionContext.getPartitionId()).thenReturn("partition-id");
|
||||||
when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
|
when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException {
|
public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException {
|
||||||
TestRunner testRunner = TestRunners.newTestRunner(processor);
|
TestRunner testRunner = TestRunners.newTestRunner(processor);
|
||||||
|
@ -130,6 +131,23 @@ public class TestConsumeAzureEventHub {
|
||||||
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
|
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
|
||||||
testRunner.assertValid();
|
testRunner.assertValid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReceivedApplicationProperties() throws Exception {
|
||||||
|
final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8));
|
||||||
|
singleEvent.getProperties().put("event-sender", "Apache NiFi");
|
||||||
|
singleEvent.getProperties().put("application", "TestApp");
|
||||||
|
final Iterable<EventData> eventDataList = Arrays.asList(singleEvent);
|
||||||
|
eventProcessor.onEvents(partitionContext, eventDataList);
|
||||||
|
|
||||||
|
processSession.assertCommitted();
|
||||||
|
final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
|
||||||
|
assertEquals(1, flowFiles.size());
|
||||||
|
final MockFlowFile msg1 = flowFiles.get(0);
|
||||||
|
msg1.assertAttributeEquals("eventhub.property.event-sender", "Apache NiFi");
|
||||||
|
msg1.assertAttributeEquals("eventhub.property.application", "TestApp");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReceiveOne() throws Exception {
|
public void testReceiveOne() throws Exception {
|
||||||
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
|
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
|
||||||
|
|
Loading…
Reference in New Issue