mirror of https://github.com/apache/nifi.git
NIFI-8294 Added QueryAzureDataExplorer and StandardKustoQueryService
This closes #7122 Co-authored-by: David Handermann <exceptionfactory@apache.org> Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
ce623632cf
commit
bd33f2c911
|
@ -96,6 +96,15 @@
|
|||
<groupId>com.azure</groupId>
|
||||
<artifactId>azure-storage-queue</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure.kusto</groupId>
|
||||
<artifactId>kusto-data</artifactId>
|
||||
<version>4.0.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.data.explorer;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.services.azure.data.explorer.KustoQueryService;
|
||||
import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"Azure", "Data", "Explorer", "ADX", "Kusto"})
|
||||
@CapabilityDescription("Query Azure Data Explorer and stream JSON results to output FlowFiles")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_ERROR_MESSAGE, description = "Azure Data Explorer query error message on failures"),
|
||||
@WritesAttribute(attribute = QueryAzureDataExplorer.QUERY_EXECUTED, description = "Azure Data Explorer query executed"),
|
||||
@WritesAttribute(attribute = "mime.type", description = "Content Type set to application/json")
|
||||
})
|
||||
public class QueryAzureDataExplorer extends AbstractProcessor {
|
||||
public static final String QUERY_ERROR_MESSAGE = "query.error.message";
|
||||
|
||||
public static final String QUERY_EXECUTED = "query.executed";
|
||||
|
||||
public static final Relationship SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles containing results of a successful Query")
|
||||
.build();
|
||||
|
||||
public static final Relationship FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFiles containing original input associated with a failed Query")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KUSTO_QUERY_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Kusto Query Service")
|
||||
.displayName("Kusto Query Service")
|
||||
.description("Azure Data Explorer Kusto Query Service")
|
||||
.required(true)
|
||||
.identifiesControllerService(KustoQueryService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Database Name")
|
||||
.displayName("Database Name")
|
||||
.description("Azure Data Explorer Database Name for querying")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
||||
.name("Query")
|
||||
.displayName("Query")
|
||||
.description("Query to be run against Azure Data Explorer")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
protected static final String APPLICATION_JSON = "application/json";
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = new LinkedHashSet<>(Arrays.asList(SUCCESS, FAILURE));
|
||||
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(KUSTO_QUERY_SERVICE, DATABASE_NAME, QUERY);
|
||||
|
||||
private volatile KustoQueryService service;
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
service = context.getProperty(KUSTO_QUERY_SERVICE).asControllerService(KustoQueryService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
try {
|
||||
flowFile = session.putAttribute(flowFile, QUERY_EXECUTED, query);
|
||||
|
||||
final KustoQueryResponse kustoQueryResponse = executeQuery(databaseName, query);
|
||||
if (kustoQueryResponse.isError()) {
|
||||
getLogger().error("Query failed: {}", kustoQueryResponse.getErrorMessage());
|
||||
flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, kustoQueryResponse.getErrorMessage());
|
||||
session.transfer(flowFile, FAILURE);
|
||||
} else {
|
||||
try (final InputStream responseStream = kustoQueryResponse.getResponseStream()) {
|
||||
flowFile = session.importFrom(responseStream, flowFile);
|
||||
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
|
||||
session.transfer(flowFile, SUCCESS);
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Query failed", e);
|
||||
flowFile = session.putAttribute(flowFile, QUERY_ERROR_MESSAGE, e.getMessage());
|
||||
session.transfer(flowFile, FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
protected KustoQueryResponse executeQuery(String databaseName, String adxQuery) {
|
||||
return service.executeQuery(databaseName,adxQuery);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.data.explorer;
|
||||
|
||||
import org.apache.nifi.components.DescribedValue;
|
||||
|
||||
enum KustoAuthenticationStrategy implements DescribedValue {
|
||||
APPLICATION_CREDENTIALS("Application Credentials", "Azure Application Registration with Application Key"),
|
||||
|
||||
MANAGED_IDENTITY("Managed Identity", "Azure Managed Identity");
|
||||
|
||||
private final String displayName;
|
||||
|
||||
private final String description;
|
||||
|
||||
KustoAuthenticationStrategy(final String displayName, final String description) {
|
||||
this.displayName = displayName;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.data.explorer;
|
||||
|
||||
import com.microsoft.azure.kusto.data.ClientFactory;
|
||||
import com.microsoft.azure.kusto.data.StreamingClient;
|
||||
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@Tags({"Azure", "Data", "Explorer", "ADX", "Kusto"})
|
||||
@CapabilityDescription("Standard implementation of Kusto Query Service for Azure Data Explorer")
|
||||
public class StandardKustoQueryService extends AbstractControllerService implements KustoQueryService {
|
||||
|
||||
public static final PropertyDescriptor CLUSTER_URI = new PropertyDescriptor.Builder()
|
||||
.name("Cluster URI")
|
||||
.displayName("Cluster URI")
|
||||
.description("Azure Data Explorer Cluster URI")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Authentication Strategy")
|
||||
.displayName("Authentication Strategy")
|
||||
.description("Authentication method for access to Azure Data Explorer")
|
||||
.required(true)
|
||||
.defaultValue(KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue())
|
||||
.allowableValues(KustoAuthenticationStrategy.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor APPLICATION_CLIENT_ID = new PropertyDescriptor.Builder()
|
||||
.name("Application Client ID")
|
||||
.displayName("Application Client ID")
|
||||
.description("Azure Data Explorer Application Client Identifier for Authentication")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor APPLICATION_TENANT_ID = new PropertyDescriptor.Builder()
|
||||
.name("Application Tenant ID")
|
||||
.displayName("Application Tenant ID")
|
||||
.description("Azure Data Explorer Application Tenant Identifier for Authentication")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor APPLICATION_KEY = new PropertyDescriptor.Builder()
|
||||
.name("Application Key")
|
||||
.displayName("Application Key")
|
||||
.description("Azure Data Explorer Application Key for Authentication")
|
||||
.required(true)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dependsOn(AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue())
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
|
||||
CLUSTER_URI,
|
||||
AUTHENTICATION_STRATEGY,
|
||||
APPLICATION_CLIENT_ID,
|
||||
APPLICATION_TENANT_ID,
|
||||
APPLICATION_KEY
|
||||
);
|
||||
|
||||
private volatile StreamingClient kustoClient;
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTY_DESCRIPTORS;
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws ProcessException, URISyntaxException {
|
||||
if (this.kustoClient == null) {
|
||||
this.kustoClient = createClient(context);
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public final void onStopped() {
|
||||
if (this.kustoClient == null) {
|
||||
getLogger().debug("Kusto Client not configured");
|
||||
} else {
|
||||
try {
|
||||
this.kustoClient.close();
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Kusto Client close failed", e);
|
||||
}
|
||||
this.kustoClient = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public KustoQueryResponse executeQuery(final String databaseName, final String query) {
|
||||
Objects.requireNonNull(databaseName, "Database Name required");
|
||||
Objects.requireNonNull(query, "Query required");
|
||||
|
||||
KustoQueryResponse kustoQueryResponse;
|
||||
|
||||
try {
|
||||
final InputStream responseStream = this.kustoClient.executeStreamingQuery(databaseName, query);
|
||||
kustoQueryResponse = new KustoQueryResponse(responseStream);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Kusto Query execution failed", e);
|
||||
kustoQueryResponse = new KustoQueryResponse(true, e.getMessage());
|
||||
}
|
||||
return kustoQueryResponse;
|
||||
}
|
||||
|
||||
protected StreamingClient createClient(final ConfigurationContext context) throws URISyntaxException {
|
||||
final ConnectionStringBuilder connectionStringBuilder = getConnectionStringBuilder(context);
|
||||
return ClientFactory.createStreamingClient(connectionStringBuilder);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private ConnectionStringBuilder getConnectionStringBuilder(final ConfigurationContext context) {
|
||||
final ConnectionStringBuilder builder;
|
||||
|
||||
final String clusterUrl = context.getProperty(CLUSTER_URI).getValue();
|
||||
final String clientId = context.getProperty(APPLICATION_CLIENT_ID).getValue();
|
||||
|
||||
final KustoAuthenticationStrategy kustoAuthenticationStrategy = KustoAuthenticationStrategy.valueOf(context.getProperty(AUTHENTICATION_STRATEGY).getValue());
|
||||
|
||||
if (KustoAuthenticationStrategy.MANAGED_IDENTITY == kustoAuthenticationStrategy) {
|
||||
builder = ConnectionStringBuilder.createWithAadManagedIdentity(clusterUrl, clientId);
|
||||
} else {
|
||||
final String applicationKey = context.getProperty(APPLICATION_KEY).getValue();
|
||||
final String tenantId = context.getProperty(APPLICATION_TENANT_ID).getValue();
|
||||
builder = ConnectionStringBuilder.createWithAadApplicationCredentials(clusterUrl, clientId, applicationKey, tenantId);
|
||||
}
|
||||
|
||||
final String vendor = System.getProperty("java.vendor");
|
||||
final String version = System.getProperty("java.version");
|
||||
|
||||
builder.setConnectorDetails(vendor, version, null, null, false, null);
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -16,6 +16,7 @@ org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink
|
|||
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
|
||||
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup
|
||||
org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
|
||||
org.apache.nifi.services.azure.data.explorer.StandardKustoQueryService
|
||||
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
|
||||
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
|
||||
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService
|
||||
|
|
|
@ -20,6 +20,7 @@ org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
|
|||
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
|
||||
org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
|
||||
org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord
|
||||
org.apache.nifi.processors.azure.data.explorer.QueryAzureDataExplorer
|
||||
org.apache.nifi.processors.azure.storage.ListAzureBlobStorage_v12
|
||||
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
|
||||
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.data.explorer;
|
||||
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.services.azure.data.explorer.KustoQueryResponse;
|
||||
import org.apache.nifi.services.azure.data.explorer.KustoQueryService;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class QueryAzureDataExplorerTest {
|
||||
private static final String SERVICE_ID = KustoQueryService.class.getName();
|
||||
|
||||
private static final String DATABASE_NAME = "records";
|
||||
|
||||
private static final String QUERY = ".show table testing";
|
||||
|
||||
private static final byte[] EMPTY = new byte[]{};
|
||||
|
||||
private static final String ERROR_MESSAGE = "Results not found";
|
||||
|
||||
private static final String EMPTY_ARRAY = "[]";
|
||||
|
||||
@Mock
|
||||
private KustoQueryService kustoQueryService;
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeEach
|
||||
void setRunner() throws InitializationException {
|
||||
runner = TestRunners.newTestRunner(QueryAzureDataExplorer.class);
|
||||
|
||||
when(kustoQueryService.getIdentifier()).thenReturn(SERVICE_ID);
|
||||
runner.addControllerService(SERVICE_ID, kustoQueryService);
|
||||
runner.enableControllerService(kustoQueryService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testProperties() {
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID);
|
||||
runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME);
|
||||
runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY);
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunFailure() {
|
||||
runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID);
|
||||
runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME);
|
||||
runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY);
|
||||
|
||||
runner.enqueue(EMPTY);
|
||||
|
||||
final KustoQueryResponse kustoQueryResponse = new KustoQueryResponse(true, ERROR_MESSAGE);
|
||||
when(kustoQueryService.executeQuery(eq(DATABASE_NAME), eq(QUERY))).thenReturn(kustoQueryResponse);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(QueryAzureDataExplorer.FAILURE);
|
||||
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryAzureDataExplorer.FAILURE).iterator().next();
|
||||
flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_ERROR_MESSAGE, ERROR_MESSAGE);
|
||||
flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_EXECUTED, QUERY);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunSuccess() {
|
||||
runner.setProperty(QueryAzureDataExplorer.KUSTO_QUERY_SERVICE, SERVICE_ID);
|
||||
runner.setProperty(QueryAzureDataExplorer.DATABASE_NAME, DATABASE_NAME);
|
||||
runner.setProperty(QueryAzureDataExplorer.QUERY, QUERY);
|
||||
|
||||
runner.enqueue(EMPTY);
|
||||
|
||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(EMPTY_ARRAY.getBytes(StandardCharsets.UTF_8));
|
||||
final KustoQueryResponse kustoQueryResponse = new KustoQueryResponse(inputStream);
|
||||
when(kustoQueryService.executeQuery(eq(DATABASE_NAME), eq(QUERY))).thenReturn(kustoQueryResponse);
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(QueryAzureDataExplorer.SUCCESS);
|
||||
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryAzureDataExplorer.SUCCESS).iterator().next();
|
||||
flowFile.assertAttributeEquals(QueryAzureDataExplorer.QUERY_EXECUTED, QUERY);
|
||||
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), QueryAzureDataExplorer.APPLICATION_JSON);
|
||||
flowFile.assertContentEquals(EMPTY_ARRAY);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.data.explorer;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class StandardKustoQueryServiceTest {
|
||||
private static final String SERVICE_ID = StandardKustoQueryServiceTest.class.getSimpleName();
|
||||
|
||||
private static final String CLUSTER_URI = "https://cluster.region.kusto.windows.net";
|
||||
|
||||
private static final String APPLICATION_CLIENT_ID = String.class.getSimpleName();
|
||||
|
||||
private StandardKustoQueryService service;
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeEach
|
||||
void setRunner() {
|
||||
runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
service = new StandardKustoQueryService();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testProperties() throws InitializationException {
|
||||
runner.addControllerService(SERVICE_ID, service);
|
||||
|
||||
runner.assertNotValid(service);
|
||||
|
||||
runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI);
|
||||
runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID);
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEnableManagedIdentity() throws InitializationException {
|
||||
runner.addControllerService(SERVICE_ID, service);
|
||||
|
||||
runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI);
|
||||
runner.setProperty(service, StandardKustoQueryService.AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.MANAGED_IDENTITY.getValue());
|
||||
runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID);
|
||||
|
||||
runner.assertValid(service);
|
||||
|
||||
runner.enableControllerService(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testEnableApplicationCredentials() throws InitializationException {
|
||||
runner.addControllerService(SERVICE_ID, service);
|
||||
|
||||
runner.setProperty(service, StandardKustoQueryService.CLUSTER_URI, CLUSTER_URI);
|
||||
runner.setProperty(service, StandardKustoQueryService.AUTHENTICATION_STRATEGY, KustoAuthenticationStrategy.APPLICATION_CREDENTIALS.getValue());
|
||||
runner.setProperty(service, StandardKustoQueryService.APPLICATION_CLIENT_ID, APPLICATION_CLIENT_ID);
|
||||
runner.setProperty(service, StandardKustoQueryService.APPLICATION_KEY, UUID.randomUUID().toString());
|
||||
runner.setProperty(service, StandardKustoQueryService.APPLICATION_TENANT_ID, UUID.randomUUID().toString());
|
||||
|
||||
runner.assertValid(service);
|
||||
|
||||
runner.enableControllerService(service);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.data.explorer;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Objects;
|
||||
|
||||
public class KustoQueryResponse {
|
||||
|
||||
private final InputStream responseStream;
|
||||
|
||||
private final boolean error;
|
||||
|
||||
private final String errorMessage;
|
||||
|
||||
public KustoQueryResponse(final boolean error, final String errorMessage) {
|
||||
this.responseStream = null;
|
||||
this.error = error;
|
||||
this.errorMessage = errorMessage;
|
||||
}
|
||||
|
||||
public KustoQueryResponse(final InputStream responseStream) {
|
||||
this.responseStream = Objects.requireNonNull(responseStream, "Response Stream required");
|
||||
this.error = false;
|
||||
this.errorMessage = null;
|
||||
}
|
||||
|
||||
public InputStream getResponseStream() {
|
||||
return responseStream;
|
||||
}
|
||||
|
||||
public boolean isError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
public String getErrorMessage() {
|
||||
return errorMessage;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.services.azure.data.explorer;
|
||||
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
|
||||
public interface KustoQueryService extends ControllerService {
|
||||
/**
|
||||
* Execute Query on Azure Data Explorer
|
||||
*
|
||||
* @param databaseName Database Name for query
|
||||
* @param query Query to be executed
|
||||
* @return Query Response containing stream or error status
|
||||
*/
|
||||
KustoQueryResponse executeQuery(String databaseName, String query);
|
||||
}
|
Loading…
Reference in New Issue