diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index c50e2ca7fe..e991bae734 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -514,6 +514,12 @@ language governing permissions and limitations under the License. --> 1.18.0-SNAPSHOT nar + + org.apache.nifi + nifi-azure-record-sink-nar + 1.18.0-SNAPSHOT + nar + org.apache.nifi nifi-azure-services-api-nar diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml new file mode 100644 index 0000000000..912ce0a1c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-azure-bundle + 1.18.0-SNAPSHOT + + + nifi-azure-record-sink-nar + nar + + true + true + + + + + org.apache.nifi + nifi-azure-record-sink + 1.18.0-SNAPSHOT + + + org.apache.nifi + nifi-standard-services-api-nar + 1.18.0-SNAPSHOT + nar + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml new file mode 100644 index 0000000000..a8d6073937 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml @@ -0,0 +1,73 @@ + + + + + nifi-azure-bundle + org.apache.nifi + 1.18.0-SNAPSHOT + + 4.0.0 + + nifi-azure-record-sink + + + + com.azure + azure-messaging-eventhubs + 5.12.0 + + + com.azure + azure-identity + ${azure.identity.version} + + + org.apache.nifi + nifi-record-sink-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.18.0-SNAPSHOT + + + org.mockito + mockito-inline + ${mockito.version} + test + + + org.apache.nifi + nifi-mock + test + + + org.apache.nifi + nifi-mock-record-utils + test + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java new file mode 100644 index 0000000000..65d40538ae --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.eventhub; + +import org.apache.nifi.components.DescribedValue; + +public enum AzureAuthenticationStrategy implements DescribedValue { + SHARED_ACCESS_KEY("Shared Access Key", "Azure Event Hub shared access key"), + DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "The Default Azure Credential " + + "will read credentials from standard environment variables and will also attempt to read " + + "Managed Identity credentials when running in Microsoft Azure environments"); + + private final String displayName; + private final String description; + + AzureAuthenticationStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getValue() { + return name(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java new file mode 100644 index 0000000000..53f09d5061 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.eventhub; + +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.record.sink.RecordSinkService; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +@Tags({"azure", "record", "sink"}) +@CapabilityDescription("Format and send Records to Azure Event Hubs") +public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService { + + static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint"); + + static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "China Service Bus Endpoint"); + + static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Germany Service Bus Endpoint"); + + static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "United States Government Endpoint"); + + static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() + .name("Service Bus Endpoint") + .description("Provides the domain for connecting to Azure Event Hubs") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues( + AZURE_ENDPOINT, + AZURE_CHINA_ENDPOINT, + AZURE_GERMANY_ENDPOINT, + AZURE_US_GOV_ENDPOINT + ) + .defaultValue(AZURE_ENDPOINT.getValue()) + .required(true) + .build(); + + static final PropertyDescriptor EVENT_HUB_NAMESPACE = new PropertyDescriptor.Builder() + .name("Event Hub Namespace") + .description("Provides provides the host for connecting to Azure Event Hubs") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("Event Hub Name") + .description("Provides the Event Hub Name for connections") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder() + .name("Authentication Strategy") + .description("Strategy for authenticating to Azure Event Hubs") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .allowableValues(AzureAuthenticationStrategy.class) + .required(true) + .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue()) + .build(); + + static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder() + .name("Shared Access Policy") + .description("The name of the shared access policy. This policy must have Send claims") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue()) + .build(); + + static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Key") + .description("The primary or secondary key of the shared access policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .required(false) + .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue()) + .build(); + + static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder() + .name("Partition Key") + .description("A hint for Azure Event Hub message broker how to distribute messages across one or more partitions") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .build(); + + private static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList( + Arrays.asList( + SERVICE_BUS_ENDPOINT, + EVENT_HUB_NAMESPACE, + EVENT_HUB_NAME, + RECORD_WRITER_FACTORY, + AUTHENTICATION_STRATEGY, + SHARED_ACCESS_POLICY, + SHARED_ACCESS_POLICY_KEY, + PARTITION_KEY + ) + ); + + private volatile ConfigurationContext context; + private RecordSetWriterFactory writerFactory; + private EventHubProducerClient client; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + protected EventHubProducerClient createEventHubClient(final String namespace, + final String serviceBusEndpoint, + final String eventHubName, + final String policyName, + final String policyKey, + final AzureAuthenticationStrategy authenticationStrategy + ) { + final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint); + final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder(); + if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) { + final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential); + } else { + final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder(); + final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build(); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential); + } + return eventHubClientBuilder.buildProducerClient(); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class); + final String namespace = context.getProperty(EVENT_HUB_NAMESPACE).evaluateAttributeExpressions().getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(); + final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue(); + final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue(); + final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy); + client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy); + } + + @OnDisabled + public void onDisabled() { + if (client == null) { + getLogger().debug("Event Hub Client not configured"); + } else { + client.close(); + } + } + + @Override + public WriteResult sendData(final RecordSet recordSet, Map attributes, final boolean sendZeroResults) throws IOException { + final Map writeAttributes = new LinkedHashMap<>(attributes); + final String partitionKey = context.getProperty(PARTITION_KEY).evaluateAttributeExpressions(attributes).getValue(); + final CreateBatchOptions createBatchOptions = new CreateBatchOptions(); + createBatchOptions.setPartitionKey(partitionKey); + EventDataBatch eventDataBatch = client.createBatch(createBatchOptions); + final String correlationId = writeAttributes.get(CoreAttributes.UUID.key()); + int recordCount = 0; + try ( + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, attributes) + ) { + Record record; + final String contentType = writer.getMimeType(); + while ((record = recordSet.next()) != null) { + writer.write(record); + writer.flush(); + recordCount++; + + final byte[] bytes = outputStream.toByteArray(); + outputStream.reset(); + final EventData eventData = new EventData(bytes); + eventData.getProperties().putAll(writeAttributes); + eventData.setContentType(contentType); + eventData.setCorrelationId(correlationId); + eventData.setMessageId(String.format("%s-%d", correlationId, recordCount)); + if (!eventDataBatch.tryAdd(eventData)) { + if (eventDataBatch.getCount() > 0) { + client.send(eventDataBatch); + eventDataBatch = client.createBatch(createBatchOptions); + } + if (!eventDataBatch.tryAdd(eventData)) { + throw new ProcessException("Record " + recordCount + " exceeds maximum event data size: " + eventDataBatch.getMaxSizeInBytes()); + } + } + } + if (eventDataBatch.getCount() > 0) { + client.send(eventDataBatch); + } + } catch (final SchemaNotFoundException e) { + throw new IOException("Record Schema not found", e); + } + return WriteResult.of(recordCount, writeAttributes); + } +} + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..2684da536e --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java new file mode 100644 index 0000000000..8ab5595a6b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.eventhub; + +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Map; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; + +@ExtendWith(MockitoExtension.class) +public class TestAzureEventHubRecordSink { + private static final String EVENT_HUB_NAMESPACE = "namespace"; + private static final String EVENT_HUB_NAME = "hub"; + private static final String POLICY_KEY = "policyKey"; + private static final String NULL_HEADER = null; + private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName(); + private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName(); + private static final String ID_FIELD = "id"; + private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName(); + private static final RecordSchema RECORD_SCHEMA = getRecordSchema(); + private static final boolean SEND_ZERO_RESULTS = true; + + @Mock + private EventHubProducerClient client; + + @Mock + private EventDataBatch eventDataBatch; + + private AzureEventHubRecordSink azureEventHubRecordSink; + + @BeforeEach + void setRunner() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.setValidateExpressionUsage(false); + + final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false); + runner.addControllerService(WRITER_IDENTIFIER, recordWriter); + runner.enableControllerService(recordWriter); + + azureEventHubRecordSink = new MockAzureEventHubRecordSink(); + runner.addControllerService(IDENTIFIER, azureEventHubRecordSink); + runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, EVENT_HUB_NAME); + runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE); + runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY); + runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER); + runner.enableControllerService(azureEventHubRecordSink); + } + + @Test + void testSendDataNoRecords() throws IOException { + when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch); + + final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA); + final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS); + + assertNotNull(writeResult); + assertEquals(0, writeResult.getRecordCount()); + + verify(client, times(0)).send(any(EventDataBatch.class)); + } + + @Test + void testSendDataOneRecordException() { + when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch); + when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(false); + + final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1)); + assertThrows(ProcessException.class, ()-> azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS)); + + verify(client, never()).send(any(EventDataBatch.class)); + } + + @Test + void testSendDataOneRecord() throws IOException { + when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch); + when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true); + + final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1)); + final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS); + + assertNotNull(writeResult); + assertEquals(1, writeResult.getRecordCount()); + } + + @Test + void testSendDataTwoRecords() throws IOException { + when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch); + when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true); + + final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(2)); + final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS); + + assertNotNull(writeResult); + assertEquals(2, writeResult.getRecordCount()); + } + + public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink { + @Override + protected EventHubProducerClient createEventHubClient( + final String namespace, + final String serviceBusEndpoint, + final String eventHubName, + final String policyName, + final String policyKey, + final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException { + return client; + } + } + + private static RecordSchema getRecordSchema() { + final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType()); + return new SimpleRecordSchema(Collections.singletonList(idField)); + } + + private static Record[] getRecords(int numberOfRecords) { + final Map values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE); + final Record record = new MapRecord(RECORD_SCHEMA, values); + final Record[] records = new Record[numberOfRecords]; + Arrays.fill(records, record); + return records; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index a00ad4db2e..f617b0a3cd 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -41,6 +41,8 @@ nifi-azure-nar nifi-azure-services-api nifi-azure-services-api-nar + nifi-azure-record-sink + nifi-azure-record-sink-nar