diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index a416336eed..4a4d463a4e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -96,6 +96,15 @@ com.azure azure-storage-queue + + com.microsoft.azure.kusto + kusto-data + 4.0.4 + + + org.slf4j + jcl-over-slf4j + com.fasterxml.jackson.core jackson-databind diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorer.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorer.java new file mode 100644 index 0000000000..574d954645 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorer.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.data.explorer.KustoQueryService; +import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"Azure", "Data", "Explorer", "ADX", "Kusto"}) +@CapabilityDescription("Query Azure Data Explorer and stream JSON results to output FlowFiles") +@WritesAttributes({ + @WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_ERROR_MESSAGE, description = "Azure Data Explorer query error message on failures"), + @WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_EXECUTED, description = "Azure Data Explorer query executed"), + @WritesAttribute(attribute = "mime.type", description = "Content Type set to application/json") +}) +public class QueryAzureDataExplorer extends AbstractProcessor { + public static final String QUERY_ERROR_MESSAGE = "query.error.message"; + + public static final String QUERY_EXECUTED = "query.executed"; + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles containing results of a successful Query") + .build(); + + public static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles containing original input associated with a failed Query") + .build(); + + public static final PropertyDescriptor KUSTO_QUERY_SERVICE = new PropertyDescriptor.Builder() + .name("Kusto Query Service") + .displayName("Kusto Query Service") + .description("Azure Data Explorer Kusto Query Service") + .required(true) + .identifiesControllerService(KustoQueryService.class) + .build(); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("Database Name") + .displayName("Database Name") + .description("Azure Data Explorer Database Name for querying") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("Query") + .displayName("Query") + .description("Query to be run against Azure Data Explorer") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + protected static final String APPLICATION_JSON = "application/json"; + + private static final Set RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE)); + + private static final List DESCRIPTORS = Arrays.asList(KUSTO_QUERY_SERVICE, DATABASE_NAME, QUERY); + + private volatile KustoQueryService service; + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + service = context.getProperty(KUSTO_QUERY_SERVICE).asControllerService(KustoQueryService.class); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + try { + flowFile = session.putAttribute(flowFile, QUERY_EXECUTED, query); + + final KustoQueryResponse kustoQueryResponse = executeQuery(databaseName, query); + if (kustoQueryResponse.isError()) { + getLogger().error("Query failed: {}", kustoQueryResponse.getErrorMessage()); + flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, kustoQueryResponse.getErrorMessage()); + session.transfer(flowFile, FAILURE); + } else { + try (final InputStream responseStream = kustoQueryResponse.getResponseStream()) { + flowFile = session.importFrom(responseStream, flowFile); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + session.transfer(flowFile, SUCCESS); + } + } + } catch (final Exception e) { + getLogger().error("Query failed", e); + flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, e.getMessage()); + session.transfer(flowFile, FAILURE); + } + } + + protected KustoQueryResponse executeQuery(String databaseName, String adxQuery) { + return service.executeQuery(databaseName,adxQuery); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java new file mode 100644 index 0000000000..01946fe625 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoAuthenticationStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.components.DescribedValue; + +enum KustoAuthenticationStrategy implements DescribedValue { + APPLICATION_CREDENTIALS("Application Credentials", "Azure Application Registration with Application Key"), + + MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity"); + + private final String displayName; + + private final String description; + + KustoAuthenticationStrategy(final String displayName, final String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java new file mode 100644 index 0000000000..a7eb5db4dc --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryService.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.StreamingClient; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +@Tags({"Azure", "Data", "Explorer", "ADX", "Kusto"}) +@CapabilityDescription("Standard implementation of Kusto Query Service for Azure Data Explorer") +public class StandardKustoQueryService extends AbstractControllerService implements KustoQueryService { + + public static final PropertyDescriptor CLUSTER_URI = new PropertyDescriptor.Builder() + .name("Cluster URI") + .displayName("Cluster URI") + .description("Azure Data Explorer Cluster URI") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + public static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder() + .name("Authentication Strategy") + .displayName("Authentication Strategy") + .description("Authentication method for access to Azure Data Explorer") + .required(true) + .defaultValue(KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue()) + .allowableValues(KustoAuthenticationStrategy.class) + .build(); + + public static final PropertyDescriptor APPLICATION_CLIENT_ID = new PropertyDescriptor.Builder() + .name("Application Client ID") + .displayName("Application Client ID") + .description("Azure Data Explorer Application Client Identifier for Authentication") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor APPLICATION_TENANT_ID = new PropertyDescriptor.Builder() + .name("Application Tenant ID") + .displayName("Application Tenant ID") + .description("Azure Data Explorer Application Tenant Identifier for Authentication") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue()) + .build(); + + public static final PropertyDescriptor APPLICATION_KEY = new PropertyDescriptor.Builder() + .name("Application Key") + .displayName("Application Key") + .description("Azure Data Explorer Application Key for Authentication") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue()) + .build(); + + private static final List PROPERTY_DESCRIPTORS = Arrays.asList( + CLUSTER_URI, + AUTHENTICATION_STRATEGY, + APPLICATION_CLIENT_ID, + APPLICATION_TENANT_ID, + APPLICATION_KEY + ); + + private volatile StreamingClient kustoClient; + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws ProcessException, URISyntaxException { + if (this.kustoClient == null) { + this.kustoClient = createClient(context); + } + } + + @OnStopped + public final void onStopped() { + if (this.kustoClient == null) { + getLogger().debug("Kusto Client not configured"); + } else { + try { + this.kustoClient.close(); + } catch (final Exception e) { + getLogger().error("Kusto Client close failed", e); + } + this.kustoClient = null; + } + } + + @Override + public KustoQueryResponse executeQuery(final String databaseName, final String query) { + Objects.requireNonNull(databaseName, "Database Name required"); + Objects.requireNonNull(query, "Query required"); + + KustoQueryResponse kustoQueryResponse; + + try { + final InputStream responseStream = this.kustoClient.executeStreamingQuery(databaseName, query); + kustoQueryResponse = new KustoQueryResponse(responseStream); + } catch (final Exception e) { + getLogger().error("Kusto Query execution failed", e); + kustoQueryResponse = new KustoQueryResponse(true, e.getMessage()); + } + return kustoQueryResponse; + } + + protected StreamingClient createClient(final ConfigurationContext context) throws URISyntaxException { + final ConnectionStringBuilder connectionStringBuilder = getConnectionStringBuilder(context); + return ClientFactory.createStreamingClient(connectionStringBuilder); + } + + @SuppressWarnings("unchecked") + private ConnectionStringBuilder getConnectionStringBuilder(final ConfigurationContext context) { + final ConnectionStringBuilder builder; + + final String clusterUrl = context.getProperty(CLUSTER_URI).getValue(); + final String clientId = context.getProperty(APPLICATION_CLIENT_ID).getValue(); + + final KustoAuthenticationStrategy kustoAuthenticationStrategy = KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue()); + + if (KustoAuthenticationStrategy.MANAGED_IDENTITY == kustoAuthenticationStrategy) { + builder = ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, clientId); + } else { + final String applicationKey = context.getProperty(APPLICATION_KEY).getValue(); + final String tenantId = context.getProperty(APPLICATION_TENANT_ID).getValue(); + builder = ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl, clientId, applicationKey, tenantId); + } + + final String vendor = System.getProperty("java.vendor"); + final String version = System.getProperty("java.version"); + + builder.setConnectorDetails(vendor, version, null, null, false, null); + return builder; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 9e3a7ea3c9..c529f9efd0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -16,6 +16,7 @@ org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService +org.apache.nifi.services.azure.data.explorer.StandardKustoQueryService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12 org.apache.nifi.services.azure.StandardAzureCredentialsControllerService diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d82d4e5ea1..5f8458fd93 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -20,6 +20,7 @@ org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord +org.apache.nifi.processors.azure.data.explorer.QueryAzureDataExplorer org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12 diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorerTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorerTest.java new file mode 100644 index 0000000000..fd7e702f29 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/data/explorer/QueryAzureDataExplorerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.data.explorer; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse; +import org.apache.nifi.services.azure.data.explorer.KustoQueryService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class QueryAzureDataExplorerTest { + private static final String SERVICE_ID = KustoQueryService.class.getName(); + + private static final String DATABASE_NAME = "records"; + + private static final String QUERY = ".show table testing"; + + private static final byte[] EMPTY = new byte[]{}; + + private static final String ERROR_MESSAGE = "Results not found"; + + private static final String EMPTY_ARRAY = "[]"; + + @Mock + private KustoQueryService kustoQueryService; + + private TestRunner runner; + + @BeforeEach + void setRunner() throws InitializationException { + runner = TestRunners.newTestRunner(QueryAzureDataExplorer.class); + + when(kustoQueryService.getIdentifier()).thenReturn(SERVICE_ID); + runner.addControllerService(SERVICE_ID, kustoQueryService); + runner.enableControllerService(kustoQueryService); + } + + @Test + void testProperties() { + runner.assertNotValid(); + + runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID); + runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY); + + runner.assertValid(); + } + + @Test + void testRunFailure() { + runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID); + runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY); + + runner.enqueue(EMPTY); + + final KustoQueryResponse kustoQueryResponse = new KustoQueryResponse(true, ERROR_MESSAGE); + when(kustoQueryService.executeQuery(eq(DATABASE_NAME), eq(QUERY))).thenReturn(kustoQueryResponse); + + runner.run(); + + runner.assertAllFlowFilesTransferred(QueryAzureDataExplorer.FAILURE); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryAzureDataExplorer.FAILURE).iterator().next(); + flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_ERROR_MESSAGE, ERROR_MESSAGE); + flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_EXECUTED, QUERY); + } + + @Test + void testRunSuccess() { + runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID); + runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY); + + runner.enqueue(EMPTY); + + final ByteArrayInputStream inputStream = new ByteArrayInputStream(EMPTY_ARRAY.getBytes(StandardCharsets.UTF_8)); + final KustoQueryResponse kustoQueryResponse = new KustoQueryResponse(inputStream); + when(kustoQueryService.executeQuery(eq(DATABASE_NAME), eq(QUERY))).thenReturn(kustoQueryResponse); + + runner.run(); + + runner.assertAllFlowFilesTransferred(QueryAzureDataExplorer.SUCCESS); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryAzureDataExplorer.SUCCESS).iterator().next(); + flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_EXECUTED, QUERY); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), QueryAzureDataExplorer.APPLICATION_JSON); + flowFile.assertContentEquals(EMPTY_ARRAY); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryServiceTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryServiceTest.java new file mode 100644 index 0000000000..fc7bc92bdd --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/data/explorer/StandardKustoQueryServiceTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.UUID; + +public class StandardKustoQueryServiceTest { + private static final String SERVICE_ID = StandardKustoQueryServiceTest.class.getSimpleName(); + + private static final String CLUSTER_URI = "https://cluster.region.kusto.windows.net"; + + private static final String APPLICATION_CLIENT_ID = String.class.getSimpleName(); + + private StandardKustoQueryService service; + + private TestRunner runner; + + @BeforeEach + void setRunner() { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + service = new StandardKustoQueryService(); + } + + @Test + void testProperties() throws InitializationException { + runner.addControllerService(SERVICE_ID, service); + + runner.assertNotValid(service); + + runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI); + runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID); + + runner.assertValid(service); + } + + @Test + void testEnableManagedIdentity() throws InitializationException { + runner.addControllerService(SERVICE_ID, service); + + runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI); + runner.setProperty(service, StandardKustoQueryService.AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue()); + runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID); + + runner.assertValid(service); + + runner.enableControllerService(service); + } + + @Test + void testEnableApplicationCredentials() throws InitializationException { + runner.addControllerService(SERVICE_ID, service); + + runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI); + runner.setProperty(service, StandardKustoQueryService.AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue()); + runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID); + runner.setProperty(service, StandardKustoQueryService.APPLICATION_KEY, UUID.randomUUID().toString()); + runner.setProperty(service, StandardKustoQueryService.APPLICATION_TENANT_ID, UUID.randomUUID().toString()); + + runner.assertValid(service); + + runner.enableControllerService(service); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryResponse.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryResponse.java new file mode 100644 index 0000000000..fae6bb41b1 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryResponse.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import java.io.InputStream; +import java.util.Objects; + +public class KustoQueryResponse { + + private final InputStream responseStream; + + private final boolean error; + + private final String errorMessage; + + public KustoQueryResponse(final boolean error, final String errorMessage) { + this.responseStream = null; + this.error = error; + this.errorMessage = errorMessage; + } + + public KustoQueryResponse(final InputStream responseStream) { + this.responseStream = Objects.requireNonNull(responseStream, "Response Stream required"); + this.error = false; + this.errorMessage = null; + } + + public InputStream getResponseStream() { + return responseStream; + } + + public boolean isError() { + return error; + } + + public String getErrorMessage() { + return errorMessage; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryService.java new file mode 100644 index 0000000000..a78dd4436c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/data/explorer/KustoQueryService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.data.explorer; + +import org.apache.nifi.controller.ControllerService; + +public interface KustoQueryService extends ControllerService { + /** + * Execute Query on Azure Data Explorer + * + * @param databaseName Database Name for query + * @param query Query to be executed + * @return Query Response containing stream or error status + */ + KustoQueryResponse executeQuery(String databaseName, String query); +}