mirror of https://github.com/apache/nifi.git
NIFI-10300 Added AzureEventHubRecordSink
This closes #6296 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
0d0a119737
commit
eb419b6e40
|
@ -514,6 +514,12 @@ language governing permissions and limitations under the License. -->
|
|||
<version>1.18.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-azure-record-sink-nar</artifactId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-azure-services-api-nar</artifactId>
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-azure-bundle</artifactId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-azure-record-sink-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
<properties>
|
||||
<maven.javadoc.skip>true</maven.javadoc.skip>
|
||||
<source.skip>true</source.skip>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-azure-record-sink</artifactId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,73 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>nifi-azure-bundle</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>nifi-azure-record-sink</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.azure</groupId>
|
||||
<artifactId>azure-messaging-eventhubs</artifactId>
|
||||
<version>5.12.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.azure</groupId>
|
||||
<artifactId>azure-identity</artifactId>
|
||||
<version>${azure.identity.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-sink-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
<version>1.18.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-inline</artifactId>
|
||||
<version>${mockito.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock-record-utils</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.eventhub;
|
||||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
public enum AzureAuthenticationStrategy implements DescribedValue {
|
||||
SHARED_ACCESS_KEY("Shared Access Key", "Azure Event Hub shared access key"),
|
||||
DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "The Default Azure Credential " +
|
||||
"will read credentials from standard environment variables and will also attempt to read " +
|
||||
"Managed Identity credentials when running in Microsoft Azure environments");
|
||||
|
||||
private final String displayName;
|
||||
private final String description;
|
||||
|
||||
AzureAuthenticationStrategy(String displayName, String description) {
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return name();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,242 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.eventhub;
|
||||
|
||||
import com.azure.core.credential.AzureNamedKeyCredential;
|
||||
import com.azure.identity.DefaultAzureCredential;
|
||||
import com.azure.identity.DefaultAzureCredentialBuilder;
|
||||
|
||||
import com.azure.messaging.eventhubs.EventData;
|
||||
import com.azure.messaging.eventhubs.EventDataBatch;
|
||||
import com.azure.messaging.eventhubs.EventHubClientBuilder;
|
||||
import com.azure.messaging.eventhubs.EventHubProducerClient;
|
||||
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Tags({"azure", "record", "sink"})
|
||||
@CapabilityDescription("Format and send Records to Azure Event Hubs")
|
||||
public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
|
||||
|
||||
static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint");
|
||||
|
||||
static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "China Service Bus Endpoint");
|
||||
|
||||
static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Germany Service Bus Endpoint");
|
||||
|
||||
static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "United States Government Endpoint");
|
||||
|
||||
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
|
||||
.name("Service Bus Endpoint")
|
||||
.description("Provides the domain for connecting to Azure Event Hubs")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues(
|
||||
AZURE_ENDPOINT,
|
||||
AZURE_CHINA_ENDPOINT,
|
||||
AZURE_GERMANY_ENDPOINT,
|
||||
AZURE_US_GOV_ENDPOINT
|
||||
)
|
||||
.defaultValue(AZURE_ENDPOINT.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor EVENT_HUB_NAMESPACE = new PropertyDescriptor.Builder()
|
||||
.name("Event Hub Namespace")
|
||||
.description("Provides provides the host for connecting to Azure Event Hubs")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Event Hub Name")
|
||||
.description("Provides the Event Hub Name for connections")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Authentication Strategy")
|
||||
.description("Strategy for authenticating to Azure Event Hubs")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.allowableValues(AzureAuthenticationStrategy.class)
|
||||
.required(true)
|
||||
.defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
|
||||
.name("Shared Access Policy")
|
||||
.description("The name of the shared access policy. This policy must have Send claims")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
.dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
|
||||
.name("Shared Access Policy Key")
|
||||
.description("The primary or secondary key of the shared access policy")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.required(false)
|
||||
.dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
|
||||
.name("Partition Key")
|
||||
.description("A hint for Azure Event Hub message broker how to distribute messages across one or more partitions")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
|
||||
Arrays.asList(
|
||||
SERVICE_BUS_ENDPOINT,
|
||||
EVENT_HUB_NAMESPACE,
|
||||
EVENT_HUB_NAME,
|
||||
RECORD_WRITER_FACTORY,
|
||||
AUTHENTICATION_STRATEGY,
|
||||
SHARED_ACCESS_POLICY,
|
||||
SHARED_ACCESS_POLICY_KEY,
|
||||
PARTITION_KEY
|
||||
)
|
||||
);
|
||||
|
||||
private volatile ConfigurationContext context;
|
||||
private RecordSetWriterFactory writerFactory;
|
||||
private EventHubProducerClient client;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTY_DESCRIPTORS;
|
||||
}
|
||||
|
||||
protected EventHubProducerClient createEventHubClient(final String namespace,
|
||||
final String serviceBusEndpoint,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final AzureAuthenticationStrategy authenticationStrategy
|
||||
) {
|
||||
final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
|
||||
final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
|
||||
if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
|
||||
final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
|
||||
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
|
||||
} else {
|
||||
final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
|
||||
final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
|
||||
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
|
||||
}
|
||||
return eventHubClientBuilder.buildProducerClient();
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
this.context = context;
|
||||
writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
|
||||
final String namespace = context.getProperty(EVENT_HUB_NAMESPACE).evaluateAttributeExpressions().getValue();
|
||||
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue();
|
||||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
|
||||
final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue();
|
||||
final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
|
||||
final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
|
||||
final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
|
||||
client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
public void onDisabled() {
|
||||
if (client == null) {
|
||||
getLogger().debug("Event Hub Client not configured");
|
||||
} else {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult sendData(final RecordSet recordSet, Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
|
||||
final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
|
||||
final String partitionKey = context.getProperty(PARTITION_KEY).evaluateAttributeExpressions(attributes).getValue();
|
||||
final CreateBatchOptions createBatchOptions = new CreateBatchOptions();
|
||||
createBatchOptions.setPartitionKey(partitionKey);
|
||||
EventDataBatch eventDataBatch = client.createBatch(createBatchOptions);
|
||||
final String correlationId = writeAttributes.get(CoreAttributes.UUID.key());
|
||||
int recordCount = 0;
|
||||
try (
|
||||
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, attributes)
|
||||
) {
|
||||
Record record;
|
||||
final String contentType = writer.getMimeType();
|
||||
while ((record = recordSet.next()) != null) {
|
||||
writer.write(record);
|
||||
writer.flush();
|
||||
recordCount++;
|
||||
|
||||
final byte[] bytes = outputStream.toByteArray();
|
||||
outputStream.reset();
|
||||
final EventData eventData = new EventData(bytes);
|
||||
eventData.getProperties().putAll(writeAttributes);
|
||||
eventData.setContentType(contentType);
|
||||
eventData.setCorrelationId(correlationId);
|
||||
eventData.setMessageId(String.format("%s-%d", correlationId, recordCount));
|
||||
if (!eventDataBatch.tryAdd(eventData)) {
|
||||
if (eventDataBatch.getCount() > 0) {
|
||||
client.send(eventDataBatch);
|
||||
eventDataBatch = client.createBatch(createBatchOptions);
|
||||
}
|
||||
if (!eventDataBatch.tryAdd(eventData)) {
|
||||
throw new ProcessException("Record " + recordCount + " exceeds maximum event data size: " + eventDataBatch.getMaxSizeInBytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (eventDataBatch.getCount() > 0) {
|
||||
client.send(eventDataBatch);
|
||||
}
|
||||
} catch (final SchemaNotFoundException e) {
|
||||
throw new IOException("Record Schema not found", e);
|
||||
}
|
||||
return WriteResult.of(recordCount, writeAttributes);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.eventhub;
|
||||
|
||||
import com.azure.messaging.eventhubs.EventData;
|
||||
import com.azure.messaging.eventhubs.EventDataBatch;
|
||||
import com.azure.messaging.eventhubs.EventHubProducerClient;
|
||||
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.isA;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestAzureEventHubRecordSink {
|
||||
private static final String EVENT_HUB_NAMESPACE = "namespace";
|
||||
private static final String EVENT_HUB_NAME = "hub";
|
||||
private static final String POLICY_KEY = "policyKey";
|
||||
private static final String NULL_HEADER = null;
|
||||
private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
|
||||
private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
|
||||
private static final String ID_FIELD = "id";
|
||||
private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
|
||||
private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
|
||||
private static final boolean SEND_ZERO_RESULTS = true;
|
||||
|
||||
@Mock
|
||||
private EventHubProducerClient client;
|
||||
|
||||
@Mock
|
||||
private EventDataBatch eventDataBatch;
|
||||
|
||||
private AzureEventHubRecordSink azureEventHubRecordSink;
|
||||
|
||||
@BeforeEach
|
||||
void setRunner() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
|
||||
runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
|
||||
runner.enableControllerService(recordWriter);
|
||||
|
||||
azureEventHubRecordSink = new MockAzureEventHubRecordSink();
|
||||
runner.addControllerService(IDENTIFIER, azureEventHubRecordSink);
|
||||
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, EVENT_HUB_NAME);
|
||||
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE);
|
||||
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY);
|
||||
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
|
||||
runner.enableControllerService(azureEventHubRecordSink);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendDataNoRecords() throws IOException {
|
||||
when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
|
||||
|
||||
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
|
||||
final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
|
||||
|
||||
assertNotNull(writeResult);
|
||||
assertEquals(0, writeResult.getRecordCount());
|
||||
|
||||
verify(client, times(0)).send(any(EventDataBatch.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendDataOneRecordException() {
|
||||
when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
|
||||
when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(false);
|
||||
|
||||
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
|
||||
assertThrows(ProcessException.class, ()-> azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS));
|
||||
|
||||
verify(client, never()).send(any(EventDataBatch.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendDataOneRecord() throws IOException {
|
||||
when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
|
||||
when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
|
||||
|
||||
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
|
||||
final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
|
||||
|
||||
assertNotNull(writeResult);
|
||||
assertEquals(1, writeResult.getRecordCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendDataTwoRecords() throws IOException {
|
||||
when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
|
||||
when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
|
||||
|
||||
final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(2));
|
||||
final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
|
||||
|
||||
assertNotNull(writeResult);
|
||||
assertEquals(2, writeResult.getRecordCount());
|
||||
}
|
||||
|
||||
public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink {
|
||||
@Override
|
||||
protected EventHubProducerClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String serviceBusEndpoint,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
private static RecordSchema getRecordSchema() {
|
||||
final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
|
||||
return new SimpleRecordSchema(Collections.singletonList(idField));
|
||||
}
|
||||
|
||||
private static Record[] getRecords(int numberOfRecords) {
|
||||
final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
|
||||
final Record record = new MapRecord(RECORD_SCHEMA, values);
|
||||
final Record[] records = new Record[numberOfRecords];
|
||||
Arrays.fill(records, record);
|
||||
return records;
|
||||
}
|
||||
}
|
|
@ -41,6 +41,8 @@
|
|||
<module>nifi-azure-nar</module>
|
||||
<module>nifi-azure-services-api</module>
|
||||
<module>nifi-azure-services-api-nar</module>
|
||||
<module>nifi-azure-record-sink</module>
|
||||
<module>nifi-azure-record-sink-nar</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
|
Loading…
Reference in New Issue