From 80f49eb7bdcd761cdf42631d4d9378c4004fb4e7 Mon Sep 17 00:00:00 2001 From: sjyang18 Date: Mon, 4 May 2020 22:58:36 +0000 Subject: [PATCH] NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB This closes #4253 Signed-off-by: Joey Frazee --- .../nifi-azure-processors/pom.xml | 102 ++++++ .../AbstractAzureCosmosDBProcessor.java | 304 ++++++++++++++++++ .../cosmos/document/AzureCosmosDBUtils.java | 63 ++++ .../document/PutAzureCosmosDBRecord.java | 221 +++++++++++++ .../document/AzureCosmosDBClientService.java | 163 ++++++++++ ...g.apache.nifi.controller.ControllerService | 3 +- .../org.apache.nifi.processor.Processor | 3 +- .../ITAbstractAzureCosmosDBDocument.java | 216 +++++++++++++ .../document/ITPutAzureCosmosDBRecord.java | 165 ++++++++++ .../azure/cosmos/document/MockTestBase.java | 74 +++++ .../document/PutAzureCosmosDBRecordTest.java | 294 +++++++++++++++++ .../TestAzureCosmosDBClientService.java | 72 +++++ .../nifi-azure-services-api/pom.xml | 37 +++ .../AzureCosmosDBConnectionService.java | 33 ++ nifi-nar-bundles/nifi-azure-bundle/pom.xml | 3 +- 15 files changed, 1750 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 253a43d9aa..58c0ccc859 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -60,11 +60,23 @@ com.azure azure-core ${azure.core.version} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + + com.azure azure-identity 1.0.6 + + + com.azure + azure-core + + com.microsoft.azure @@ -75,15 +87,74 @@ com.microsoft.azure azure-eventhubs-eph ${azure-eventhubs-eph.version} + + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + com.microsoft.azure azure-storage + + + com.fasterxml.jackson.core + jackson-core + + + + + com.azure + azure-cosmos + ${azure-cosmos.version} + + + com.azure + azure-core + + + com.google.guava + guava + + com.azure azure-storage-file-datalake 12.2.0 + + + com.azure + azure-core + + + + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.google.guava + guava + 27.0.1-jre + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${jackson.version} org.apache.nifi @@ -97,6 +168,37 @@ 1.13.0-SNAPSHOT test + + org.apache.nifi + nifi-avro-record-utils + 1.13.0-SNAPSHOT + test + + + org.apache.nifi + nifi-json-utils + 1.13.0-SNAPSHOT + test + + + + org.apache.nifi + nifi-schema-registry-service-api + 1.13.0-SNAPSHOT + test + + + org.apache.nifi + nifi-record-serialization-services + 1.13.0-SNAPSHOT + test + + + org.mockito + mockito-inline + 3.3.3 + test + org.apache.nifi nifi-processor-utils diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java new file mode 100644 index 0000000000..b92098a3c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AbstractAzureCosmosDBProcessor.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosContainerResponse; +import com.azure.cosmos.models.CosmosDatabaseResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService; + +public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor { + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are written to Cosmos DB are routed to this relationship") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All FlowFiles that cannot be written to Cosmos DB are routed to this relationship") + .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("All input FlowFiles that are part of a successful are routed to this relationship") + .build(); + + static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-connection-service") + .displayName("Cosmos DB Connection Service") + .description("If configured, the controller service used to obtain the connection string and access key") + .required(false) + .identifiesControllerService(AzureCosmosDBConnectionService.class) + .build(); + + static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-name") + .displayName("Cosmos DB Name") + .description("The database name or id. This is used as the namespace for document collections or containers") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CONTAINER_ID = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-container-id") + .displayName("Cosmos DB Container ID") + .description("The unique identifier for the container") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-partition-key") + .displayName("Cosmos DB Partition Key") + .description("The partition key used to distribute data among servers") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("charactor-set") + .displayName("Charactor Set") + .description("The Character Set in which the data is encoded") + .required(false) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + static final List descriptors; + + static { + List _temp = new ArrayList<>(); + _temp.add(CONNECTION_SERVICE); + _temp.add(AzureCosmosDBUtils.URI); + _temp.add(AzureCosmosDBUtils.DB_ACCESS_KEY); + _temp.add(AzureCosmosDBUtils.CONSISTENCY); + _temp.add(DATABASE_NAME); + _temp.add(CONTAINER_ID); + _temp.add(PARTITION_KEY); + descriptors = Collections.unmodifiableList(_temp); + } + + private CosmosClient cosmosClient; + private CosmosContainer container; + private AzureCosmosDBConnectionService connectionService; + + @OnScheduled + public void onScheduled(final ProcessContext context) throws CosmosException { + final ComponentLog logger = getLogger(); + + if (context.getProperty(CONNECTION_SERVICE).isSet()) { + this.connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(AzureCosmosDBConnectionService.class); + this.cosmosClient = this.connectionService.getCosmosClient(); + } else { + final String uri = context.getProperty(AzureCosmosDBUtils.URI).getValue(); + final String accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue(); + final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue(); + final ConsistencyLevel clevel; + switch (selectedConsistency) { + case AzureCosmosDBUtils.CONSISTENCY_STRONG: + clevel = ConsistencyLevel.STRONG; + break; + case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX: + clevel = ConsistencyLevel.CONSISTENT_PREFIX; + break; + case AzureCosmosDBUtils.CONSISTENCY_SESSION: + clevel = ConsistencyLevel.SESSION; + break; + case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS: + clevel = ConsistencyLevel.BOUNDED_STALENESS; + break; + case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL: + clevel = ConsistencyLevel.EVENTUAL; + break; + default: + clevel = ConsistencyLevel.SESSION; + } + if (cosmosClient != null) { + onStopped(); + } + if (logger.isDebugEnabled()) { + logger.debug("Creating CosmosClient"); + } + createCosmosClient(uri, accessKey, clevel); + } + getCosmosDocumentContainer(context); + doPostActionOnSchedule(context); + } + + protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) { + this.cosmosClient = new CosmosClientBuilder() + .endpoint(uri) + .key(accessKey) + .consistencyLevel(clevel) + .buildClient(); + } + + protected abstract void doPostActionOnSchedule(final ProcessContext context); + + protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException { + final String databaseName = context.getProperty(DATABASE_NAME).getValue(); + final String containerID = context.getProperty(CONTAINER_ID).getValue(); + final String partitionKey = context.getProperty(PARTITION_KEY).getValue(); + + final CosmosDatabaseResponse databaseResponse = this.cosmosClient.createDatabaseIfNotExists(databaseName); + final CosmosDatabase database = this.cosmosClient.getDatabase(databaseResponse.getProperties().getId()); + + final CosmosContainerProperties containerProperties = + new CosmosContainerProperties(containerID, "/"+partitionKey); + + // Create container by default if Not exists. + final CosmosContainerResponse containerResponse = database.createContainerIfNotExists(containerProperties); + this.container = database.getContainer(containerResponse.getProperties().getId()); + } + + @OnStopped + public final void onStopped() { + final ComponentLog logger = getLogger(); + if (connectionService == null && cosmosClient != null) { + // close client only when cosmoclient is created in Processor. + if(logger.isDebugEnabled()) { + logger.debug("Closing CosmosClient"); + } + try{ + this.container = null; + this.cosmosClient.close(); + }catch(CosmosException e) { + logger.error("Error closing Cosmos DB client due to {}", new Object[] { e.getMessage() }, e); + } finally { + this.cosmosClient = null; + } + } + } + + protected String getURI(final ProcessContext context) { + if (this.connectionService != null) { + return this.connectionService.getURI(); + } else { + return context.getProperty(AzureCosmosDBUtils.URI).getValue(); + } + } + + protected String getAccessKey(final ProcessContext context) { + if (this.connectionService != null) { + return this.connectionService.getAccessKey(); + } else { + return context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue(); + } + } + + protected String getConsistencyLevel(final ProcessContext context) { + if (this.connectionService != null) { + return this.connectionService.getConsistencyLevel(); + } else { + return context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue(); + } + } + + @Override + protected Collection customValidate(ValidationContext context) { + List retVal = new ArrayList<>(); + + boolean connectionServiceIsSet = context.getProperty(CONNECTION_SERVICE).isSet(); + boolean uriIsSet = context.getProperty(AzureCosmosDBUtils.URI).isSet(); + boolean accessKeyIsSet = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).isSet(); + boolean databaseIsSet = context.getProperty(DATABASE_NAME).isSet(); + boolean collectionIsSet = context.getProperty(CONTAINER_ID).isSet(); + boolean partitionIsSet = context.getProperty(PARTITION_KEY).isSet(); + + if (connectionServiceIsSet && (uriIsSet || accessKeyIsSet) ) { + // If connection Service is set, None of the Processor variables URI and accessKey + // should be set. + final String msg = String.format( + "If connection service is used for DB connection, none of %s and %s should be set", + AzureCosmosDBUtils.URI.getDisplayName(), + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + ); + retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); + } else if (!connectionServiceIsSet && (!uriIsSet || !accessKeyIsSet)) { + // If connection Service is not set, Both of the Processor variable URI and accessKey + // should be set. + final String msg = String.format( + "If connection service is not used for DB connection, both %s and %s should be set", + AzureCosmosDBUtils.URI.getDisplayName(), + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + ); + retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); + } + if (!databaseIsSet) { + final String msg = AbstractAzureCosmosDBProcessor.DATABASE_NAME.getDisplayName() + " must be set."; + retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); + } + if (!collectionIsSet) { + final String msg = AbstractAzureCosmosDBProcessor.CONTAINER_ID.getDisplayName() + " must be set."; + retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); + } + if (!partitionIsSet) { + final String msg = AbstractAzureCosmosDBProcessor.PARTITION_KEY.getDisplayName() + " must be set."; + retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build()); + } + return retVal; + } + + protected CosmosClient getCosmosClient() { + return cosmosClient; + } + + protected void setCosmosClient(CosmosClient cosmosClient) { + this.cosmosClient = cosmosClient; + } + + protected CosmosContainer getContainer() { + return container; + } + + protected void setContainer(CosmosContainer container) { + this.container = container; + } + + protected AzureCosmosDBConnectionService getConnectionService() { + return connectionService; + } + + protected void setConnectionService(AzureCosmosDBConnectionService connectionService) { + this.connectionService = connectionService; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java new file mode 100644 index 0000000000..3b4fe220d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/AzureCosmosDBUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; + +public final class AzureCosmosDBUtils { + public static final String CONSISTENCY_STRONG = "STRONG"; + public static final String CONSISTENCY_BOUNDED_STALENESS= "BOUNDED_STALENESS"; + public static final String CONSISTENCY_SESSION = "SESSION"; + public static final String CONSISTENCY_CONSISTENT_PREFIX = "CONSISTENT_PREFIX"; + public static final String CONSISTENCY_EVENTUAL = "EVENTUAL"; + + public static final PropertyDescriptor URI = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-uri") + .displayName("Cosmos DB URI") + .description("Cosmos DB URI, typically in the form of https://{databaseaccount}.documents.azure.com:443/" + + " Note this host URL is for Cosmos DB with Core SQL API" + + " from Azure Portal (Overview->URI)") + .required(false) + .addValidator(StandardValidators.URI_VALIDATOR) + .sensitive(true) + .build(); + + public static final PropertyDescriptor DB_ACCESS_KEY = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-key") + .displayName("Cosmos DB Access Key") + .description("Cosmos DB Access Key from Azure Portal (Settings->Keys). " + + "Choose a read-write key to enable database or container creation at run time") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .sensitive(true) + .build(); + + public static final PropertyDescriptor CONSISTENCY = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-consistency-level") + .displayName("Cosmos DB Consistency Level") + .description("Choose from five consistency levels on the consistency spectrum. " + + "Refer to Cosmos DB documentation for their differences") + .required(false) + .defaultValue(CONSISTENCY_SESSION) + .allowableValues(CONSISTENCY_STRONG, CONSISTENCY_BOUNDED_STALENESS, CONSISTENCY_SESSION, + CONSISTENCY_CONSISTENT_PREFIX, CONSISTENCY_EVENTUAL) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java new file mode 100644 index 0000000000..cfbb221d12 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecord.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.ConflictException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +@EventDriven +@Tags({ "azure", "cosmos", "insert", "record", "put" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and " + + "schema to read an incoming record set from the body of a Flowfile and then inserts those records into " + + "a configured Cosmos DB Container.") +@SystemResourceConsideration(resource = SystemResource.MEMORY) +public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor { + + private String conflictHandlingStrategy; + static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure"); + static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure"); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("insert-batch-size") + .displayName("Insert Batch Size") + .description("The number of records to group together for one single insert operation against Cosmos DB") + .defaultValue("20") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder() + .name("azure-cosmos-db-conflict-handling-strategy") + .displayName("Cosmos DB Conflict Handling Strategy") + .description("Choose whether to ignore or upsert when conflict error occurs during insertion") + .required(false) + .allowableValues(IGNORE_CONFLICT, UPSERT_CONFLICT) + .defaultValue(IGNORE_CONFLICT.getValue()) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + + private final static Set relationships; + private final static List propertyDescriptors; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(RECORD_READER_FACTORY); + _propertyDescriptors.add(INSERT_BATCH_SIZE); + _propertyDescriptors.add(CONFLICT_HANDLE_STRATEGY); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + protected void bulkInsert(final List> records) throws CosmosException{ + // In the future, this method will be replaced by calling createItems API + // for example, this.container.createItems(records); + // currently, no createItems API available in Azure Cosmos Java SDK + final ComponentLog logger = getLogger(); + final CosmosContainer container = getContainer(); + for (Map record : records){ + try { + container.createItem(record); + } catch (ConflictException e) { + // insert with unique id is expected. In case conflict occurs, use the selected strategy. + // By default, it will ignore. + if (conflictHandlingStrategy != null && conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){ + container.upsertItem(record); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Ignoring duplicate based on selected conflict resolution strategy"); + } + } + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + + final String partitionKeyField = context.getProperty(PARTITION_KEY).getValue(); + List> batch = new ArrayList<>(); + int ceiling = context.getProperty(INSERT_BATCH_SIZE).asInteger(); + boolean error = false; + try (final InputStream inStream = session.read(flowFile); + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) { + + RecordSchema schema = reader.getSchema(); + Record record; + + while ((record = reader.nextRecord()) != null) { + // Convert each Record to HashMap + Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(schema)); + if(contentMap.containsKey("id")) { + final Object idObj = contentMap.get("id"); + final String idStr = (idObj == null) ? "" : String.valueOf(idObj); + if (idObj == null || StringUtils.isBlank(idStr)) { + // dont put null to id + contentMap.put("id", UUID.randomUUID().toString()); + } else { + contentMap.put("id", idStr); + } + } else { + contentMap.put("id", UUID.randomUUID().toString()); + } + if (!contentMap.containsKey(partitionKeyField)) { + logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", partitionKeyField)); + error = true; + break; + } + batch.add(contentMap); + if (batch.size() == ceiling) { + bulkInsert(batch); + batch = new ArrayList<>(); + } + } + if (!error && batch.size() > 0) { + bulkInsert(batch); + } + } catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) { + logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e); + error = true; + } finally { + if (!error) { + session.getProvenanceReporter().send(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); + } else { + session.transfer(flowFile, REL_FAILURE); + } + } + session.commit(); + } + + @Override + protected void doPostActionOnSchedule(final ProcessContext context) { + conflictHandlingStrategy = context.getProperty(CONFLICT_HANDLE_STRATEGY).getValue(); + if (conflictHandlingStrategy == null) + conflictHandlingStrategy = IGNORE_CONFLICT.getValue(); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java new file mode 100644 index 0000000000..c56abb073a --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/cosmos/document/AzureCosmosDBClientService.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.services.azure.cosmos.document; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosException; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils; +import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService; +import org.apache.nifi.util.StringUtils; + +@Tags({"azure", "cosmos", "document", "service"}) +@CapabilityDescription( + "Provides a controller service that configures a connection to Cosmos DB (Core SQL API) " + + " and provides access to that connection to other Cosmos DB-related components." +) +public class AzureCosmosDBClientService extends AbstractControllerService implements AzureCosmosDBConnectionService { + private String uri; + private String accessKey; + private String consistencyLevel; + private CosmosClient cosmosClient; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.uri = context.getProperty(AzureCosmosDBUtils.URI).getValue(); + this.accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue(); + final ConsistencyLevel clevel; + final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue(); + + switch(selectedConsistency) { + case AzureCosmosDBUtils.CONSISTENCY_STRONG: + clevel = ConsistencyLevel.STRONG; + break; + case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX: + clevel = ConsistencyLevel.CONSISTENT_PREFIX; + break; + case AzureCosmosDBUtils.CONSISTENCY_SESSION: + clevel = ConsistencyLevel.SESSION; + break; + case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS: + clevel = ConsistencyLevel.BOUNDED_STALENESS; + break; + case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL: + clevel = ConsistencyLevel.EVENTUAL; + break; + default: + clevel = ConsistencyLevel.SESSION; + } + + if (this.cosmosClient != null) { + onStopped(); + } + consistencyLevel = clevel.toString(); + createCosmosClient(uri, accessKey, clevel); + } + + + @OnStopped + public final void onStopped() { + if (this.cosmosClient != null) { + try { + cosmosClient.close(); + } catch(CosmosException e) { + getLogger().error("Closing cosmosClient Failed: " + e.getMessage(), e); + } finally { + this.cosmosClient = null; + } + } + } + + protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){ + this.cosmosClient = new CosmosClientBuilder() + .endpoint(uri) + .key(accessKey) + .consistencyLevel(clevel) + .buildClient(); + } + + static List descriptors = new ArrayList<>(); + + static { + descriptors.add(AzureCosmosDBUtils.URI); + descriptors.add(AzureCosmosDBUtils.DB_ACCESS_KEY); + descriptors.add(AzureCosmosDBUtils.CONSISTENCY); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public String getURI() { + return this.uri; + } + + @Override + public String getAccessKey() { + return this.accessKey; + } + @Override + public String getConsistencyLevel() { + return this.consistencyLevel; + } + + @Override + public CosmosClient getCosmosClient() { + return this.cosmosClient; + } + public void setCosmosClient(CosmosClient client) { + this.cosmosClient = client; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(); + + final String uri = validationContext.getProperty(AzureCosmosDBUtils.URI).getValue(); + final String accessKey = validationContext.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue(); + + if (StringUtils.isBlank(uri) || StringUtils.isBlank(accessKey)) { + results.add(new ValidationResult.Builder() + .subject("AzureStorageCredentialsControllerService") + .valid(false) + .explanation( + "either " + AzureCosmosDBUtils.URI.getDisplayName() + + " or " + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + " is required") + .build()); + } + return results; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 4179771dfc..5e63d4b190 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -14,4 +14,5 @@ # limitations under the License. org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup -org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService \ No newline at end of file +org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService +org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 015596e386..6456dd0b39 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -24,4 +24,5 @@ org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage -org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage +org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java new file mode 100644 index 0000000000..06bc335681 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITAbstractAzureCosmosDBDocument.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Properties; +import java.util.logging.Logger; + +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosDatabase; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosContainerResponse; +import com.azure.cosmos.models.CosmosDatabaseResponse; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.util.CosmosPagedIterable; +import com.fasterxml.jackson.databind.JsonNode; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +public abstract class ITAbstractAzureCosmosDBDocument { + static Logger logger = Logger.getLogger(ITAbstractAzureCosmosDBDocument.class.getName()); + + private static final Properties CONFIG; + + private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; + protected static final String TEST_COSMOS_DB_NAME = "nifi-test-db"; + protected static final String TEST_COSMOS_CONTAINER_NAME = "nifi-test-container"; + protected static final String TEST_COSMOS_PARTITION_KEY_FIELD_NAME = "category"; + protected static CosmosClient client; + protected static CosmosDatabase cdb; + protected static CosmosContainer container; + + static { + final FileInputStream fis; + CONFIG = new Properties(); + try { + fis = new FileInputStream(CREDENTIALS_FILE); + try { + CONFIG.load(fis); + } catch (IOException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } finally { + FileUtils.closeQuietly(fis); + } + } catch (FileNotFoundException e) { + fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); + } + } + + protected static String getComosURI() { + return CONFIG.getProperty("cosmosURI"); + } + + protected static String getCosmosKey() { + return CONFIG.getProperty("cosmosKey"); + } + + protected TestRunner runner; + + @BeforeClass + public static void createTestDBContainerIfNeeded() throws CosmosException { + final String testDBURI = getComosURI(); + final String testDBContainer = getCosmosKey(); + + client = new CosmosClientBuilder() + .endpoint(testDBURI) + .key(testDBContainer) + .buildClient(); + + CosmosDatabaseResponse databaseResponse = client.createDatabase(TEST_COSMOS_DB_NAME); + cdb = client.getDatabase(databaseResponse.getProperties().getId()); + CosmosContainerProperties containerProperties = + new CosmosContainerProperties(TEST_COSMOS_CONTAINER_NAME, "/"+TEST_COSMOS_PARTITION_KEY_FIELD_NAME); + CosmosContainerResponse containerResponse = cdb.createContainer(containerProperties); + container = cdb.getContainer(containerResponse.getProperties().getId()); + assertNotNull(container); + } + + @AfterClass + public static void dropTestDBAndContainer() throws CosmosException { + resetTestCosmosConnection(); + if (container != null) { + try { + container.delete(); + } catch(CosmosException e) { + logger.info(e.getMessage()); + } finally { + container = null; + } + } + if (cdb != null) { + try { + cdb.delete(); + } catch(CosmosException e) { + logger.info(e.getMessage()); + } finally { + cdb = null; + } + } + if (client != null){ + try { + client.close(); + } catch(CosmosException e) { + logger.info(e.getMessage()); + } finally { + client = null; + } + } + } + + @Before + public void setUpCosmosIT() { + final String testDBURI = getComosURI(); + final String testDBContainer = getCosmosKey(); + runner = TestRunners.newTestRunner(getProcessorClass()); + runner.setProperty(AzureCosmosDBUtils.URI, testDBURI); + runner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, testDBContainer); + runner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, TEST_COSMOS_DB_NAME); + runner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID, TEST_COSMOS_CONTAINER_NAME); + runner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY, TEST_COSMOS_PARTITION_KEY_FIELD_NAME); + runner.setIncomingConnection(false); + runner.setNonLoopConnection(false); + } + + protected static void closeClient() { + try { + client.close(); + } catch(CosmosException e){ + logger.info(e.getMessage()); + } finally { + client =null; + cdb = null; + container = null; + } + } + + protected static void resetTestCosmosConnection() { + if (client != null) { + closeClient(); + } + final String testDBURI = getComosURI(); + final String testDBContainer = getCosmosKey(); + + client = new CosmosClientBuilder() + .endpoint(testDBURI) + .key(testDBContainer) + .buildClient(); + cdb = client.getDatabase(TEST_COSMOS_DB_NAME); + container = cdb.getContainer(TEST_COSMOS_CONTAINER_NAME); + } + + protected abstract Class getProcessorClass(); + + protected void configureCosmosConnectionControllerService() throws Exception { + runner.removeProperty(AzureCosmosDBUtils.URI); + runner.removeProperty(AzureCosmosDBUtils.DB_ACCESS_KEY); + + AzureCosmosDBClientService service = new AzureCosmosDBClientService(); + runner.addControllerService("connService", service); + + runner.setProperty(service, AzureCosmosDBUtils.URI,getComosURI()); + runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, getCosmosKey()); + // now, after enabling and setting the service, it should be valid + runner.enableControllerService(service); + runner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, service.getIdentifier()); + runner.assertValid(); + } + + protected void clearTestData() throws Exception { + logger.info("clearing test data"); + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions(); + + CosmosPagedIterable response = container.queryItems( + "select * from c order by c._ts", queryOptions, JsonNode.class ); + + response.forEach(data -> { + if (data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME) != null){ + PartitionKey pkey = new PartitionKey(data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME).asText()); + container.deleteItem(data.get("id").asText(), pkey, new CosmosItemRequestOptions()); + } else { + container.deleteItem(data.get("id").asText(), PartitionKey.NONE, new CosmosItemRequestOptions()); + } + }); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java new file mode 100644 index 0000000000..242827c09e --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/ITPutAzureCosmosDBRecord.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.logging.Logger; + +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.util.CosmosPagedIterable; +import com.fasterxml.jackson.databind.JsonNode; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ITPutAzureCosmosDBRecord extends ITAbstractAzureCosmosDBDocument { + static Logger logger = Logger.getLogger(ITPutAzureCosmosDBRecord.class.getName()); + + @Override + protected Class getProcessorClass() { + return PutAzureCosmosDBRecord.class; + } + + @Before + public void setUp() throws Exception { + resetTestCosmosConnection(); + } + + @After + public void cleanupTestCase() { + try{ + clearTestData(); + closeClient(); + } catch(Exception e) { + + } + } + private List getDataFromTestDB() { + logger.info("getDataFromTestDB for test result validation"); + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions(); + List results = new ArrayList<>(); + + CosmosPagedIterable response = container.queryItems( + "select * from c order by c._ts", queryOptions, JsonNode.class ); + + response.forEach(data -> { + results.add(data); + }); + return results; + } + + private MockRecordParser recordReader; + + private void setupRecordReader() throws InitializationException { + recordReader = new MockRecordParser(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader"); + } + + @Test + public void testOnTriggerWithNestedRecords() throws InitializationException { + setupRecordReader(); + recordReader.addSchemaField("id", RecordFieldType.STRING); + recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING); + + final List personFields = new ArrayList<>(); + final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()); + final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType()); + final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType()); + personFields.add(nameField); + personFields.add(ageField); + personFields.add(sportField); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -3185956498135742190L; + { + put("name", "John Doe"); + put("age", 48); + put("sport", "Soccer"); + } + })); + recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = 1L; + { + put("name", "Jane Doe"); + put("age", 47); + put("sport", "Tennis"); + } + })); + recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -1329194249439570573L; + { + put("name", "Sally Doe"); + put("age", 47); + put("sport", "Curling"); + } + })); + recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -1329194249439570574L; + { + put("name", "Jimmy Doe"); + put("age", 14); + put("sport", null); + } + })); + + runner.enqueue(""); + runner.run(); + runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1); + assertEquals(4, getDataFromTestDB().size()); + } + + @Test + public void testOnTriggerWithFlatRecords() throws InitializationException { + setupRecordReader(); + recordReader.addSchemaField("id", RecordFieldType.STRING); + recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("1", "A", "John Doe", 48, "Soccer"); + recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis"); + recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling"); + recordReader.addRecord("4", "A", "Jimmy Doe", 14, null); + recordReader.addRecord("5", "C","Pizza Doe", 14, null); + + runner.enqueue(""); + runner.run(); + runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1); + assertEquals(5, getDataFromTestDB().size()); + } + + + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java new file mode 100644 index 0000000000..ca1414b261 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/MockTestBase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.azure.cosmos.document; + +import static org.mockito.Mockito.mock; + +import java.util.Random; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClient; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService; +import org.apache.nifi.util.TestRunner; + +public class MockTestBase { + + protected static final String MOCK_DB_NAME = "MOCK_DB_NAME"; + protected static final String MOCK_CONTAINER_ID = "MOCK_CONTAINER_ID"; + protected static final String MOCK_URI = "MOCK_URI"; + protected static final String MOCK_DB_ACCESS_KEY = "MOCK_DB_ACCESS_KEY"; + public static final String MOCK_QUERY = "select * from c"; + + public static final String MOCK_PARTITION_FIELD_NAME = "category"; + protected TestRunner testRunner; + + protected void setBasicMockProperties(boolean withConnectionService) throws InitializationException { + if (testRunner != null) { + testRunner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, MOCK_DB_NAME); + testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID,MOCK_CONTAINER_ID); + testRunner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY,MOCK_PARTITION_FIELD_NAME); + if (withConnectionService) { + // setup connnection controller service + AzureCosmosDBClientService service = new MockConnectionService(); + testRunner.addControllerService("connService", service); + testRunner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI); + testRunner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY); + + // now, after enabling and setting the service, it should be valid + testRunner.enableControllerService(service); + testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, "connService"); + } + } + } + + private static Random random = new Random(); + public static int getRandomInt(int min, int max){ + return random.nextInt((max-min)+1) + min; + } + + private class MockConnectionService extends AzureCosmosDBClientService { + @Override + protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){ + // mock cosmos client + this.setCosmosClient(mock(CosmosClient.class)); + } + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java new file mode 100644 index 0000000000..4a6b8a8ef8 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/cosmos/document/PutAzureCosmosDBRecordTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.cosmos.document; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosContainer; +import com.azure.cosmos.CosmosException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockSchemaRegistry; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import net.minidev.json.JSONObject; +public class PutAzureCosmosDBRecordTest extends MockTestBase { + + private MockPutAzureCosmosDBRecord processor; + private MockRecordParser recordReader; + + private void setupRecordReader() throws InitializationException { + recordReader = new MockRecordParser(); + if (testRunner != null) { + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader"); + } + + } + + @Before + public void setUp() throws Exception { + processor = new MockPutAzureCosmosDBRecord(); + testRunner = TestRunners.newTestRunner(processor); + testRunner.setIncomingConnection(false); + testRunner.setNonLoopConnection(false); + } + + @Test + public void testPutCosmosRecordProcessorConfigValidity() throws Exception { + setBasicMockProperties(false); + testRunner.setProperty(AzureCosmosDBUtils.URI, MOCK_URI); + testRunner.assertNotValid(); + testRunner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY); + + testRunner.assertNotValid(); + + setupRecordReader(); + testRunner.assertValid(); + processor.setCosmosClient(null); + processor.onScheduled(testRunner.getProcessContext()); + assertNotNull(processor.getCosmosClient()); + } + + @Test + public void testPutCosmosRecordProcessorConfigValidityWithConnectionService() throws Exception { + setBasicMockProperties(true); + testRunner.assertNotValid(); + // setup recordReader + setupRecordReader(); + testRunner.assertValid(); + processor.setCosmosClient(null); + processor.onScheduled(testRunner.getProcessContext()); + assertNotNull(processor.getCosmosClient()); + } + + @Test + public void testOnTriggerWithFlatRecords() throws Exception { + setupRecordReader(); + prepareMockTest(); + recordReader.addSchemaField("id", RecordFieldType.STRING); + recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("1", "A", "John Doe", 48, "Soccer"); + recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis"); + recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling"); + recordReader.addRecord("4", "A", "Jimmy Doe", 14, null); + recordReader.addRecord("5", "C","Pizza Doe", 14, null); + + testRunner.enqueue(""); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1); + assertEquals(5, processor.getTestResults().size()); + } + + @Test + public void testOnTriggerWithNestedRecords() throws Exception { + setupRecordReader(); + prepareMockTest(); + recordReader.addSchemaField("id", RecordFieldType.STRING); + recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING); + + final List personFields = new ArrayList<>(); + final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()); + final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType()); + final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType()); + personFields.add(nameField); + personFields.add(ageField); + personFields.add(sportField); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + + recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -3185956498135742190L; + { + put("name", "John Doe"); + put("age", 48); + put("sport", "Soccer"); + } + })); + recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = 1L; + { + put("name", "Jane Doe"); + put("age", 47); + put("sport", "Tennis"); + } + })); + recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -1329194249439570573L; + { + put("name", "Sally Doe"); + put("age", 47); + put("sport", "Curling"); + } + })); + recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap() { + private static final long serialVersionUID = -1329194249439570574L; + { + put("name", "Jimmy Doe"); + put("age", 14); + put("sport", null); + } + })); + testRunner.enqueue(""); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1); + assertEquals(4, processor.getTestResults().size()); + } + + @Test + public void testArrayConversion() throws Exception { + Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create(); + // schema creation for test + JsonObject schemaDef = new JsonObject(); + schemaDef.addProperty("type", "record"); + schemaDef.addProperty("name", "Test"); + JsonArray schemaArray = new JsonArray(); + JsonObject f1 = new JsonObject(); + f1.addProperty("type", "string"); + f1.addProperty("name", "id"); + schemaArray.add(f1); + JsonObject f2 = new JsonObject(); + f2.addProperty("type", "string"); + f2.addProperty("name", "name"); + schemaArray.add(f2); + JsonObject f3 = new JsonObject(); + f3.addProperty("type", "string"); + f3.addProperty("name", "sport"); + schemaArray.add(f3); + JsonObject arrayDef = new JsonObject(); + arrayDef.addProperty("type", "array"); + arrayDef.addProperty("items", "string"); + JsonObject f4 = new JsonObject(); + f4.add("type", arrayDef); + f4.addProperty("name", "arrayTest"); + schemaArray.add(f4); + schemaDef.add("fields", schemaArray); + + // test data generation + JsonObject testData = new JsonObject(); + testData.addProperty("id", UUID.randomUUID().toString()); + testData.addProperty("name", "John Doe"); + testData.addProperty("sport", "Soccer"); + JsonArray jarray = new JsonArray(); + jarray.add("a"); + jarray.add("b"); + jarray.add("c"); + testData.add("arrayTest", jarray); + + // setup registry and reader + MockSchemaRegistry registry = new MockSchemaRegistry(); + RecordSchema rschema = AvroTypeUtil.createSchema(new Schema.Parser().parse(gson.toJson(schemaDef))); + registry.addSchema("test", rschema); + JsonTreeReader reader = new JsonTreeReader(); + testRunner.addControllerService("registry", registry); + testRunner.addControllerService("reader", reader); + testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry"); + testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader"); + testRunner.enableControllerService(registry); + testRunner.enableControllerService(reader); + prepareMockTest(); + // override partiton key for this test case + testRunner.setProperty(PutAzureCosmosDBRecord.PARTITION_KEY, "sport"); + + Map attrs = new HashMap<>(); + attrs.put("schema.name", "test"); + + testRunner.enqueue(gson.toJson(testData), attrs); + testRunner.run(); + + testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_FAILURE, 0); + testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_SUCCESS, 1); + List> backendData = processor.getTestResults(); + assertEquals(1, backendData.size()); + //validate array data + JSONObject arrayTestResult = new JSONObject(); + arrayTestResult.putAll(backendData.get(0)); + Object[] check = (Object []) arrayTestResult.get("arrayTest"); + assertArrayEquals(new Object[]{"a", "b", "c"}, check); + } + private void prepareMockTest() throws Exception { + // this setup connection service and basic mock properties + setBasicMockProperties(true); + } +} + +class MockPutAzureCosmosDBRecord extends PutAzureCosmosDBRecord { + + static CosmosClient mockClient = mock(CosmosClient.class); + static CosmosContainer mockContainer = mock(CosmosContainer.class); + private List> mockBackend = new ArrayList<>(); + + @Override + protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) { + this.setCosmosClient(mockClient); + } + @Override + protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException { + this.setContainer(mockContainer); + } + + @Override + protected void bulkInsert(List> records ) throws CosmosException{ + this.mockBackend.addAll(records); + } + + public List> getTestResults() { + return mockBackend; + } + + + public CosmosContainer getMockConainer() { + return mockContainer; + } + + + + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java new file mode 100644 index 0000000000..64e7cce8ca --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/cosmos/document/TestAzureCosmosDBClientService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.cosmos.document; + +import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestAzureCosmosDBClientService { + + private static final String MOCK_URI = "https://mockURI:443/"; + private static final String DB_ACCESS_KEY = "mockDB_ACCESS_KEY"; + + private TestRunner runner; + private AzureCosmosDBClientService service; + + @Before + public void setUp() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + service = new AzureCosmosDBClientService(); + runner.addControllerService("connService", service); + } + + @Test + public void testValidWithURIandDBAccessKey() { + configureURI(); + configureDBAccessKey(); + + runner.assertValid(service); + } + + @Test + public void testNotValidBecauseURIMissing() { + configureDBAccessKey(); + + runner.assertNotValid(service); + } + + @Test + public void testNotValidBecauseDBAccessKeyMissing() { + configureURI(); + + runner.assertNotValid(service); + } + + private void configureURI() { + runner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI); + } + + private void configureDBAccessKey() { + runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, DB_ACCESS_KEY); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml index ff6af32f6a..57dacf57dc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml @@ -27,11 +27,38 @@ com.microsoft.azure azure-storage + + + com.fasterxml.jackson.core + jackson-core + + com.azure azure-core ${azure.core.version} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + + + + + com.azure + azure-cosmos + ${azure-cosmos.version} + + + com.azure + azure-core + + + com.google.guava + guava + + @@ -39,6 +66,16 @@ jackson-core ${jackson.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${jackson.version} + + + com.google.guava + guava + 27.0.1-jre + org.apache.nifi nifi-api diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java new file mode 100644 index 0000000000..387e1f2db7 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/cosmos/AzureCosmosDBConnectionService.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.cosmos; + +import com.azure.cosmos.CosmosClient; + +import org.apache.nifi.controller.ControllerService; + +public interface AzureCosmosDBConnectionService extends ControllerService { + + String getURI(); + + String getAccessKey(); + + String getConsistencyLevel(); + + CosmosClient getCosmosClient(); + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index 86897e5103..e1bee31501 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -27,8 +27,9 @@ 8.4.0 - 1.5.0 + 1.6.0 2.10.3 + 4.2.0