NIFI-7406 Added PutAzureCosmosRecord processor for Azure Cosmos DB

This closes #4253

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
sjyang18 2020-05-04 22:58:36 +00:00 committed by Joey Frazee
parent 2b461bbf29
commit 80f49eb7bd
15 changed files with 1750 additions and 3 deletions

View File

@ -60,11 +60,23 @@
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>${azure.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.0.6</version>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
@ -75,15 +87,74 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>${azure-eventhubs-eph.version}</version>
<exclusions>
<!--depdendency resolution with com.azure sdk -->
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>${azure-cosmos.version}</version>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.2.0</version>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -97,6 +168,37 @@
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
public abstract class AbstractAzureCosmosDBProcessor extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are written to Cosmos DB are routed to this relationship")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All FlowFiles that cannot be written to Cosmos DB are routed to this relationship")
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("All input FlowFiles that are part of a successful are routed to this relationship")
.build();
static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-connection-service")
.displayName("Cosmos DB Connection Service")
.description("If configured, the controller service used to obtain the connection string and access key")
.required(false)
.identifiesControllerService(AzureCosmosDBConnectionService.class)
.build();
static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-name")
.displayName("Cosmos DB Name")
.description("The database name or id. This is used as the namespace for document collections or containers")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CONTAINER_ID = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-container-id")
.displayName("Cosmos DB Container ID")
.description("The unique identifier for the container")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-partition-key")
.displayName("Cosmos DB Partition Key")
.description("The partition key used to distribute data among servers")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("charactor-set")
.displayName("Charactor Set")
.description("The Character Set in which the data is encoded")
.required(false)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
static final List<PropertyDescriptor> descriptors;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(CONNECTION_SERVICE);
_temp.add(AzureCosmosDBUtils.URI);
_temp.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
_temp.add(AzureCosmosDBUtils.CONSISTENCY);
_temp.add(DATABASE_NAME);
_temp.add(CONTAINER_ID);
_temp.add(PARTITION_KEY);
descriptors = Collections.unmodifiableList(_temp);
}
private CosmosClient cosmosClient;
private CosmosContainer container;
private AzureCosmosDBConnectionService connectionService;
@OnScheduled
public void onScheduled(final ProcessContext context) throws CosmosException {
final ComponentLog logger = getLogger();
if (context.getProperty(CONNECTION_SERVICE).isSet()) {
this.connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(AzureCosmosDBConnectionService.class);
this.cosmosClient = this.connectionService.getCosmosClient();
} else {
final String uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
final String accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
final ConsistencyLevel clevel;
switch (selectedConsistency) {
case AzureCosmosDBUtils.CONSISTENCY_STRONG:
clevel = ConsistencyLevel.STRONG;
break;
case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
clevel = ConsistencyLevel.CONSISTENT_PREFIX;
break;
case AzureCosmosDBUtils.CONSISTENCY_SESSION:
clevel = ConsistencyLevel.SESSION;
break;
case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
clevel = ConsistencyLevel.BOUNDED_STALENESS;
break;
case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
clevel = ConsistencyLevel.EVENTUAL;
break;
default:
clevel = ConsistencyLevel.SESSION;
}
if (cosmosClient != null) {
onStopped();
}
if (logger.isDebugEnabled()) {
logger.debug("Creating CosmosClient");
}
createCosmosClient(uri, accessKey, clevel);
}
getCosmosDocumentContainer(context);
doPostActionOnSchedule(context);
}
protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
this.cosmosClient = new CosmosClientBuilder()
.endpoint(uri)
.key(accessKey)
.consistencyLevel(clevel)
.buildClient();
}
protected abstract void doPostActionOnSchedule(final ProcessContext context);
protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
final String databaseName = context.getProperty(DATABASE_NAME).getValue();
final String containerID = context.getProperty(CONTAINER_ID).getValue();
final String partitionKey = context.getProperty(PARTITION_KEY).getValue();
final CosmosDatabaseResponse databaseResponse = this.cosmosClient.createDatabaseIfNotExists(databaseName);
final CosmosDatabase database = this.cosmosClient.getDatabase(databaseResponse.getProperties().getId());
final CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerID, "/"+partitionKey);
// Create container by default if Not exists.
final CosmosContainerResponse containerResponse = database.createContainerIfNotExists(containerProperties);
this.container = database.getContainer(containerResponse.getProperties().getId());
}
@OnStopped
public final void onStopped() {
final ComponentLog logger = getLogger();
if (connectionService == null && cosmosClient != null) {
// close client only when cosmoclient is created in Processor.
if(logger.isDebugEnabled()) {
logger.debug("Closing CosmosClient");
}
try{
this.container = null;
this.cosmosClient.close();
}catch(CosmosException e) {
logger.error("Error closing Cosmos DB client due to {}", new Object[] { e.getMessage() }, e);
} finally {
this.cosmosClient = null;
}
}
}
protected String getURI(final ProcessContext context) {
if (this.connectionService != null) {
return this.connectionService.getURI();
} else {
return context.getProperty(AzureCosmosDBUtils.URI).getValue();
}
}
protected String getAccessKey(final ProcessContext context) {
if (this.connectionService != null) {
return this.connectionService.getAccessKey();
} else {
return context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
}
}
protected String getConsistencyLevel(final ProcessContext context) {
if (this.connectionService != null) {
return this.connectionService.getConsistencyLevel();
} else {
return context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> retVal = new ArrayList<>();
boolean connectionServiceIsSet = context.getProperty(CONNECTION_SERVICE).isSet();
boolean uriIsSet = context.getProperty(AzureCosmosDBUtils.URI).isSet();
boolean accessKeyIsSet = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).isSet();
boolean databaseIsSet = context.getProperty(DATABASE_NAME).isSet();
boolean collectionIsSet = context.getProperty(CONTAINER_ID).isSet();
boolean partitionIsSet = context.getProperty(PARTITION_KEY).isSet();
if (connectionServiceIsSet && (uriIsSet || accessKeyIsSet) ) {
// If connection Service is set, None of the Processor variables URI and accessKey
// should be set.
final String msg = String.format(
"If connection service is used for DB connection, none of %s and %s should be set",
AzureCosmosDBUtils.URI.getDisplayName(),
AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
} else if (!connectionServiceIsSet && (!uriIsSet || !accessKeyIsSet)) {
// If connection Service is not set, Both of the Processor variable URI and accessKey
// should be set.
final String msg = String.format(
"If connection service is not used for DB connection, both %s and %s should be set",
AzureCosmosDBUtils.URI.getDisplayName(),
AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
}
if (!databaseIsSet) {
final String msg = AbstractAzureCosmosDBProcessor.DATABASE_NAME.getDisplayName() + " must be set.";
retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
}
if (!collectionIsSet) {
final String msg = AbstractAzureCosmosDBProcessor.CONTAINER_ID.getDisplayName() + " must be set.";
retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
}
if (!partitionIsSet) {
final String msg = AbstractAzureCosmosDBProcessor.PARTITION_KEY.getDisplayName() + " must be set.";
retVal.add(new ValidationResult.Builder().valid(false).explanation(msg).build());
}
return retVal;
}
protected CosmosClient getCosmosClient() {
return cosmosClient;
}
protected void setCosmosClient(CosmosClient cosmosClient) {
this.cosmosClient = cosmosClient;
}
protected CosmosContainer getContainer() {
return container;
}
protected void setContainer(CosmosContainer container) {
this.container = container;
}
protected AzureCosmosDBConnectionService getConnectionService() {
return connectionService;
}
protected void setConnectionService(AzureCosmosDBConnectionService connectionService) {
this.connectionService = connectionService;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
public final class AzureCosmosDBUtils {
public static final String CONSISTENCY_STRONG = "STRONG";
public static final String CONSISTENCY_BOUNDED_STALENESS= "BOUNDED_STALENESS";
public static final String CONSISTENCY_SESSION = "SESSION";
public static final String CONSISTENCY_CONSISTENT_PREFIX = "CONSISTENT_PREFIX";
public static final String CONSISTENCY_EVENTUAL = "EVENTUAL";
public static final PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-uri")
.displayName("Cosmos DB URI")
.description("Cosmos DB URI, typically in the form of https://{databaseaccount}.documents.azure.com:443/"
+ " Note this host URL is for Cosmos DB with Core SQL API"
+ " from Azure Portal (Overview->URI)")
.required(false)
.addValidator(StandardValidators.URI_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor DB_ACCESS_KEY = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-key")
.displayName("Cosmos DB Access Key")
.description("Cosmos DB Access Key from Azure Portal (Settings->Keys). "
+ "Choose a read-write key to enable database or container creation at run time")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor CONSISTENCY = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-consistency-level")
.displayName("Cosmos DB Consistency Level")
.description("Choose from five consistency levels on the consistency spectrum. "
+ "Refer to Cosmos DB documentation for their differences")
.required(false)
.defaultValue(CONSISTENCY_SESSION)
.allowableValues(CONSISTENCY_STRONG, CONSISTENCY_BOUNDED_STALENESS, CONSISTENCY_SESSION,
CONSISTENCY_CONSISTENT_PREFIX, CONSISTENCY_EVENTUAL)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
}

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ConflictException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@EventDriven
@Tags({ "azure", "cosmos", "insert", "record", "put" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and " +
"schema to read an incoming record set from the body of a Flowfile and then inserts those records into " +
"a configured Cosmos DB Container.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutAzureCosmosDBRecord extends AbstractAzureCosmosDBProcessor {
private String conflictHandlingStrategy;
static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure");
static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure");
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("insert-batch-size")
.displayName("Insert Batch Size")
.description("The number of records to group together for one single insert operation against Cosmos DB")
.defaultValue("20")
.required(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder()
.name("azure-cosmos-db-conflict-handling-strategy")
.displayName("Cosmos DB Conflict Handling Strategy")
.description("Choose whether to ignore or upsert when conflict error occurs during insertion")
.required(false)
.allowableValues(IGNORE_CONFLICT, UPSERT_CONFLICT)
.defaultValue(IGNORE_CONFLICT.getValue())
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(RECORD_READER_FACTORY);
_propertyDescriptors.add(INSERT_BATCH_SIZE);
_propertyDescriptors.add(CONFLICT_HANDLE_STRATEGY);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
protected void bulkInsert(final List<Map<String, Object>> records) throws CosmosException{
// In the future, this method will be replaced by calling createItems API
// for example, this.container.createItems(records);
// currently, no createItems API available in Azure Cosmos Java SDK
final ComponentLog logger = getLogger();
final CosmosContainer container = getContainer();
for (Map<String, Object> record : records){
try {
container.createItem(record);
} catch (ConflictException e) {
// insert with unique id is expected. In case conflict occurs, use the selected strategy.
// By default, it will ignore.
if (conflictHandlingStrategy != null && conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())){
container.upsertItem(record);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring duplicate based on selected conflict resolution strategy");
}
}
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class);
final String partitionKeyField = context.getProperty(PARTITION_KEY).getValue();
List<Map<String, Object>> batch = new ArrayList<>();
int ceiling = context.getProperty(INSERT_BATCH_SIZE).asInteger();
boolean error = false;
try (final InputStream inStream = session.read(flowFile);
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
RecordSchema schema = reader.getSchema();
Record record;
while ((record = reader.nextRecord()) != null) {
// Convert each Record to HashMap
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(schema));
if(contentMap.containsKey("id")) {
final Object idObj = contentMap.get("id");
final String idStr = (idObj == null) ? "" : String.valueOf(idObj);
if (idObj == null || StringUtils.isBlank(idStr)) {
// dont put null to id
contentMap.put("id", UUID.randomUUID().toString());
} else {
contentMap.put("id", idStr);
}
} else {
contentMap.put("id", UUID.randomUUID().toString());
}
if (!contentMap.containsKey(partitionKeyField)) {
logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", partitionKeyField));
error = true;
break;
}
batch.add(contentMap);
if (batch.size() == ceiling) {
bulkInsert(batch);
batch = new ArrayList<>();
}
}
if (!error && batch.size() > 0) {
bulkInsert(batch);
}
} catch (SchemaNotFoundException | MalformedRecordException | IOException | CosmosException e) {
logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e);
error = true;
} finally {
if (!error) {
session.getProvenanceReporter().send(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
} else {
session.transfer(flowFile, REL_FAILURE);
}
}
session.commit();
}
@Override
protected void doPostActionOnSchedule(final ProcessContext context) {
conflictHandlingStrategy = context.getProperty(CONFLICT_HANDLE_STRATEGY).getValue();
if (conflictHandlingStrategy == null)
conflictHandlingStrategy = IGNORE_CONFLICT.getValue();
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.cosmos.document;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
import org.apache.nifi.services.azure.cosmos.AzureCosmosDBConnectionService;
import org.apache.nifi.util.StringUtils;
@Tags({"azure", "cosmos", "document", "service"})
@CapabilityDescription(
"Provides a controller service that configures a connection to Cosmos DB (Core SQL API) " +
" and provides access to that connection to other Cosmos DB-related components."
)
public class AzureCosmosDBClientService extends AbstractControllerService implements AzureCosmosDBConnectionService {
private String uri;
private String accessKey;
private String consistencyLevel;
private CosmosClient cosmosClient;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.uri = context.getProperty(AzureCosmosDBUtils.URI).getValue();
this.accessKey = context.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
final ConsistencyLevel clevel;
final String selectedConsistency = context.getProperty(AzureCosmosDBUtils.CONSISTENCY).getValue();
switch(selectedConsistency) {
case AzureCosmosDBUtils.CONSISTENCY_STRONG:
clevel = ConsistencyLevel.STRONG;
break;
case AzureCosmosDBUtils.CONSISTENCY_CONSISTENT_PREFIX:
clevel = ConsistencyLevel.CONSISTENT_PREFIX;
break;
case AzureCosmosDBUtils.CONSISTENCY_SESSION:
clevel = ConsistencyLevel.SESSION;
break;
case AzureCosmosDBUtils.CONSISTENCY_BOUNDED_STALENESS:
clevel = ConsistencyLevel.BOUNDED_STALENESS;
break;
case AzureCosmosDBUtils.CONSISTENCY_EVENTUAL:
clevel = ConsistencyLevel.EVENTUAL;
break;
default:
clevel = ConsistencyLevel.SESSION;
}
if (this.cosmosClient != null) {
onStopped();
}
consistencyLevel = clevel.toString();
createCosmosClient(uri, accessKey, clevel);
}
@OnStopped
public final void onStopped() {
if (this.cosmosClient != null) {
try {
cosmosClient.close();
} catch(CosmosException e) {
getLogger().error("Closing cosmosClient Failed: " + e.getMessage(), e);
} finally {
this.cosmosClient = null;
}
}
}
protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
this.cosmosClient = new CosmosClientBuilder()
.endpoint(uri)
.key(accessKey)
.consistencyLevel(clevel)
.buildClient();
}
static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(AzureCosmosDBUtils.URI);
descriptors.add(AzureCosmosDBUtils.DB_ACCESS_KEY);
descriptors.add(AzureCosmosDBUtils.CONSISTENCY);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public String getURI() {
return this.uri;
}
@Override
public String getAccessKey() {
return this.accessKey;
}
@Override
public String getConsistencyLevel() {
return this.consistencyLevel;
}
@Override
public CosmosClient getCosmosClient() {
return this.cosmosClient;
}
public void setCosmosClient(CosmosClient client) {
this.cosmosClient = client;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String uri = validationContext.getProperty(AzureCosmosDBUtils.URI).getValue();
final String accessKey = validationContext.getProperty(AzureCosmosDBUtils.DB_ACCESS_KEY).getValue();
if (StringUtils.isBlank(uri) || StringUtils.isBlank(accessKey)) {
results.add(new ValidationResult.Builder()
.subject("AzureStorageCredentialsControllerService")
.valid(false)
.explanation(
"either " + AzureCosmosDBUtils.URI.getDisplayName()
+ " or " + AzureCosmosDBUtils.DB_ACCESS_KEY.getDisplayName() + " is required")
.build());
}
return results;
}
}

View File

@ -14,4 +14,5 @@
# limitations under the License.
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService

View File

@ -24,4 +24,5 @@ org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage
org.apache.nifi.processors.azure.cosmos.document.PutAzureCosmosDBRecord

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Logger;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosDatabaseResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
public abstract class ITAbstractAzureCosmosDBDocument {
static Logger logger = Logger.getLogger(ITAbstractAzureCosmosDBDocument.class.getName());
private static final Properties CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
protected static final String TEST_COSMOS_DB_NAME = "nifi-test-db";
protected static final String TEST_COSMOS_CONTAINER_NAME = "nifi-test-container";
protected static final String TEST_COSMOS_PARTITION_KEY_FIELD_NAME = "category";
protected static CosmosClient client;
protected static CosmosDatabase cdb;
protected static CosmosContainer container;
static {
final FileInputStream fis;
CONFIG = new Properties();
try {
fis = new FileInputStream(CREDENTIALS_FILE);
try {
CONFIG.load(fis);
} catch (IOException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
} finally {
FileUtils.closeQuietly(fis);
}
} catch (FileNotFoundException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
}
}
protected static String getComosURI() {
return CONFIG.getProperty("cosmosURI");
}
protected static String getCosmosKey() {
return CONFIG.getProperty("cosmosKey");
}
protected TestRunner runner;
@BeforeClass
public static void createTestDBContainerIfNeeded() throws CosmosException {
final String testDBURI = getComosURI();
final String testDBContainer = getCosmosKey();
client = new CosmosClientBuilder()
.endpoint(testDBURI)
.key(testDBContainer)
.buildClient();
CosmosDatabaseResponse databaseResponse = client.createDatabase(TEST_COSMOS_DB_NAME);
cdb = client.getDatabase(databaseResponse.getProperties().getId());
CosmosContainerProperties containerProperties =
new CosmosContainerProperties(TEST_COSMOS_CONTAINER_NAME, "/"+TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
CosmosContainerResponse containerResponse = cdb.createContainer(containerProperties);
container = cdb.getContainer(containerResponse.getProperties().getId());
assertNotNull(container);
}
@AfterClass
public static void dropTestDBAndContainer() throws CosmosException {
resetTestCosmosConnection();
if (container != null) {
try {
container.delete();
} catch(CosmosException e) {
logger.info(e.getMessage());
} finally {
container = null;
}
}
if (cdb != null) {
try {
cdb.delete();
} catch(CosmosException e) {
logger.info(e.getMessage());
} finally {
cdb = null;
}
}
if (client != null){
try {
client.close();
} catch(CosmosException e) {
logger.info(e.getMessage());
} finally {
client = null;
}
}
}
@Before
public void setUpCosmosIT() {
final String testDBURI = getComosURI();
final String testDBContainer = getCosmosKey();
runner = TestRunners.newTestRunner(getProcessorClass());
runner.setProperty(AzureCosmosDBUtils.URI, testDBURI);
runner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, testDBContainer);
runner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, TEST_COSMOS_DB_NAME);
runner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID, TEST_COSMOS_CONTAINER_NAME);
runner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY, TEST_COSMOS_PARTITION_KEY_FIELD_NAME);
runner.setIncomingConnection(false);
runner.setNonLoopConnection(false);
}
protected static void closeClient() {
try {
client.close();
} catch(CosmosException e){
logger.info(e.getMessage());
} finally {
client =null;
cdb = null;
container = null;
}
}
protected static void resetTestCosmosConnection() {
if (client != null) {
closeClient();
}
final String testDBURI = getComosURI();
final String testDBContainer = getCosmosKey();
client = new CosmosClientBuilder()
.endpoint(testDBURI)
.key(testDBContainer)
.buildClient();
cdb = client.getDatabase(TEST_COSMOS_DB_NAME);
container = cdb.getContainer(TEST_COSMOS_CONTAINER_NAME);
}
protected abstract Class<? extends Processor> getProcessorClass();
protected void configureCosmosConnectionControllerService() throws Exception {
runner.removeProperty(AzureCosmosDBUtils.URI);
runner.removeProperty(AzureCosmosDBUtils.DB_ACCESS_KEY);
AzureCosmosDBClientService service = new AzureCosmosDBClientService();
runner.addControllerService("connService", service);
runner.setProperty(service, AzureCosmosDBUtils.URI,getComosURI());
runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, getCosmosKey());
// now, after enabling and setting the service, it should be valid
runner.enableControllerService(service);
runner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, service.getIdentifier());
runner.assertValid();
}
protected void clearTestData() throws Exception {
logger.info("clearing test data");
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
CosmosPagedIterable<JsonNode> response = container.queryItems(
"select * from c order by c._ts", queryOptions, JsonNode.class );
response.forEach(data -> {
if (data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME) != null){
PartitionKey pkey = new PartitionKey(data.get(TEST_COSMOS_PARTITION_KEY_FIELD_NAME).asText());
container.deleteItem(data.get("id").asText(), pkey, new CosmosItemRequestOptions());
} else {
container.deleteItem(data.get("id").asText(), PartitionKey.NONE, new CosmosItemRequestOptions());
}
});
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Logger;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ITPutAzureCosmosDBRecord extends ITAbstractAzureCosmosDBDocument {
static Logger logger = Logger.getLogger(ITPutAzureCosmosDBRecord.class.getName());
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureCosmosDBRecord.class;
}
@Before
public void setUp() throws Exception {
resetTestCosmosConnection();
}
@After
public void cleanupTestCase() {
try{
clearTestData();
closeClient();
} catch(Exception e) {
}
}
private List<JsonNode> getDataFromTestDB() {
logger.info("getDataFromTestDB for test result validation");
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
List<JsonNode> results = new ArrayList<>();
CosmosPagedIterable<JsonNode> response = container.queryItems(
"select * from c order by c._ts", queryOptions, JsonNode.class );
response.forEach(data -> {
results.add(data);
});
return results;
}
private MockRecordParser recordReader;
private void setupRecordReader() throws InitializationException {
recordReader = new MockRecordParser();
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
}
@Test
public void testOnTriggerWithNestedRecords() throws InitializationException {
setupRecordReader();
recordReader.addSchemaField("id", RecordFieldType.STRING);
recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
final List<RecordField> personFields = new ArrayList<>();
final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
personFields.add(nameField);
personFields.add(ageField);
personFields.add(sportField);
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -3185956498135742190L;
{
put("name", "John Doe");
put("age", 48);
put("sport", "Soccer");
}
}));
recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = 1L;
{
put("name", "Jane Doe");
put("age", 47);
put("sport", "Tennis");
}
}));
recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -1329194249439570573L;
{
put("name", "Sally Doe");
put("age", 47);
put("sport", "Curling");
}
}));
recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -1329194249439570574L;
{
put("name", "Jimmy Doe");
put("age", 14);
put("sport", null);
}
}));
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
assertEquals(4, getDataFromTestDB().size());
}
@Test
public void testOnTriggerWithFlatRecords() throws InitializationException {
setupRecordReader();
recordReader.addSchemaField("id", RecordFieldType.STRING);
recordReader.addSchemaField(TEST_COSMOS_PARTITION_KEY_FIELD_NAME, RecordFieldType.STRING);
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
recordReader.addRecord("5", "C","Pizza Doe", 14, null);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
assertEquals(5, getDataFromTestDB().size());
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import static org.mockito.Mockito.mock;
import java.util.Random;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService;
import org.apache.nifi.util.TestRunner;
public class MockTestBase {
protected static final String MOCK_DB_NAME = "MOCK_DB_NAME";
protected static final String MOCK_CONTAINER_ID = "MOCK_CONTAINER_ID";
protected static final String MOCK_URI = "MOCK_URI";
protected static final String MOCK_DB_ACCESS_KEY = "MOCK_DB_ACCESS_KEY";
public static final String MOCK_QUERY = "select * from c";
public static final String MOCK_PARTITION_FIELD_NAME = "category";
protected TestRunner testRunner;
protected void setBasicMockProperties(boolean withConnectionService) throws InitializationException {
if (testRunner != null) {
testRunner.setProperty(AbstractAzureCosmosDBProcessor.DATABASE_NAME, MOCK_DB_NAME);
testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONTAINER_ID,MOCK_CONTAINER_ID);
testRunner.setProperty(AbstractAzureCosmosDBProcessor.PARTITION_KEY,MOCK_PARTITION_FIELD_NAME);
if (withConnectionService) {
// setup connnection controller service
AzureCosmosDBClientService service = new MockConnectionService();
testRunner.addControllerService("connService", service);
testRunner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
testRunner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
// now, after enabling and setting the service, it should be valid
testRunner.enableControllerService(service);
testRunner.setProperty(AbstractAzureCosmosDBProcessor.CONNECTION_SERVICE, "connService");
}
}
}
private static Random random = new Random();
public static int getRandomInt(int min, int max){
return random.nextInt((max-min)+1) + min;
}
private class MockConnectionService extends AzureCosmosDBClientService {
@Override
protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel){
// mock cosmos client
this.setCosmosClient(mock(CosmosClient.class));
}
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.cosmos.document;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockSchemaRegistry;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import net.minidev.json.JSONObject;
public class PutAzureCosmosDBRecordTest extends MockTestBase {
private MockPutAzureCosmosDBRecord processor;
private MockRecordParser recordReader;
private void setupRecordReader() throws InitializationException {
recordReader = new MockRecordParser();
if (testRunner != null) {
testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader);
testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
}
}
@Before
public void setUp() throws Exception {
processor = new MockPutAzureCosmosDBRecord();
testRunner = TestRunners.newTestRunner(processor);
testRunner.setIncomingConnection(false);
testRunner.setNonLoopConnection(false);
}
@Test
public void testPutCosmosRecordProcessorConfigValidity() throws Exception {
setBasicMockProperties(false);
testRunner.setProperty(AzureCosmosDBUtils.URI, MOCK_URI);
testRunner.assertNotValid();
testRunner.setProperty(AzureCosmosDBUtils.DB_ACCESS_KEY, MOCK_DB_ACCESS_KEY);
testRunner.assertNotValid();
setupRecordReader();
testRunner.assertValid();
processor.setCosmosClient(null);
processor.onScheduled(testRunner.getProcessContext());
assertNotNull(processor.getCosmosClient());
}
@Test
public void testPutCosmosRecordProcessorConfigValidityWithConnectionService() throws Exception {
setBasicMockProperties(true);
testRunner.assertNotValid();
// setup recordReader
setupRecordReader();
testRunner.assertValid();
processor.setCosmosClient(null);
processor.onScheduled(testRunner.getProcessContext());
assertNotNull(processor.getCosmosClient());
}
@Test
public void testOnTriggerWithFlatRecords() throws Exception {
setupRecordReader();
prepareMockTest();
recordReader.addSchemaField("id", RecordFieldType.STRING);
recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("1", "A", "John Doe", 48, "Soccer");
recordReader.addRecord("2", "B","Jane Doe", 47, "Tennis");
recordReader.addRecord("3", "B", "Sally Doe", 47, "Curling");
recordReader.addRecord("4", "A", "Jimmy Doe", 14, null);
recordReader.addRecord("5", "C","Pizza Doe", 14, null);
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
assertEquals(5, processor.getTestResults().size());
}
@Test
public void testOnTriggerWithNestedRecords() throws Exception {
setupRecordReader();
prepareMockTest();
recordReader.addSchemaField("id", RecordFieldType.STRING);
recordReader.addSchemaField(MOCK_PARTITION_FIELD_NAME, RecordFieldType.STRING);
final List<RecordField> personFields = new ArrayList<>();
final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
personFields.add(nameField);
personFields.add(ageField);
personFields.add(sportField);
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
recordReader.addSchemaField("person", RecordFieldType.RECORD);
recordReader.addRecord("1", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -3185956498135742190L;
{
put("name", "John Doe");
put("age", 48);
put("sport", "Soccer");
}
}));
recordReader.addRecord("2", "B", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = 1L;
{
put("name", "Jane Doe");
put("age", 47);
put("sport", "Tennis");
}
}));
recordReader.addRecord("3", "A", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -1329194249439570573L;
{
put("name", "Sally Doe");
put("age", 47);
put("sport", "Curling");
}
}));
recordReader.addRecord("4", "C", new MapRecord(personSchema, new HashMap<String,Object>() {
private static final long serialVersionUID = -1329194249439570574L;
{
put("name", "Jimmy Doe");
put("age", 14);
put("sport", null);
}
}));
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
assertEquals(4, processor.getTestResults().size());
}
@Test
public void testArrayConversion() throws Exception {
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
// schema creation for test
JsonObject schemaDef = new JsonObject();
schemaDef.addProperty("type", "record");
schemaDef.addProperty("name", "Test");
JsonArray schemaArray = new JsonArray();
JsonObject f1 = new JsonObject();
f1.addProperty("type", "string");
f1.addProperty("name", "id");
schemaArray.add(f1);
JsonObject f2 = new JsonObject();
f2.addProperty("type", "string");
f2.addProperty("name", "name");
schemaArray.add(f2);
JsonObject f3 = new JsonObject();
f3.addProperty("type", "string");
f3.addProperty("name", "sport");
schemaArray.add(f3);
JsonObject arrayDef = new JsonObject();
arrayDef.addProperty("type", "array");
arrayDef.addProperty("items", "string");
JsonObject f4 = new JsonObject();
f4.add("type", arrayDef);
f4.addProperty("name", "arrayTest");
schemaArray.add(f4);
schemaDef.add("fields", schemaArray);
// test data generation
JsonObject testData = new JsonObject();
testData.addProperty("id", UUID.randomUUID().toString());
testData.addProperty("name", "John Doe");
testData.addProperty("sport", "Soccer");
JsonArray jarray = new JsonArray();
jarray.add("a");
jarray.add("b");
jarray.add("c");
testData.add("arrayTest", jarray);
// setup registry and reader
MockSchemaRegistry registry = new MockSchemaRegistry();
RecordSchema rschema = AvroTypeUtil.createSchema(new Schema.Parser().parse(gson.toJson(schemaDef)));
registry.addSchema("test", rschema);
JsonTreeReader reader = new JsonTreeReader();
testRunner.addControllerService("registry", registry);
testRunner.addControllerService("reader", reader);
testRunner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
testRunner.setProperty(PutAzureCosmosDBRecord.RECORD_READER_FACTORY, "reader");
testRunner.enableControllerService(registry);
testRunner.enableControllerService(reader);
prepareMockTest();
// override partiton key for this test case
testRunner.setProperty(PutAzureCosmosDBRecord.PARTITION_KEY, "sport");
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.name", "test");
testRunner.enqueue(gson.toJson(testData), attrs);
testRunner.run();
testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_FAILURE, 0);
testRunner.assertTransferCount(PutAzureCosmosDBRecord.REL_SUCCESS, 1);
List<Map<String, Object>> backendData = processor.getTestResults();
assertEquals(1, backendData.size());
//validate array data
JSONObject arrayTestResult = new JSONObject();
arrayTestResult.putAll(backendData.get(0));
Object[] check = (Object []) arrayTestResult.get("arrayTest");
assertArrayEquals(new Object[]{"a", "b", "c"}, check);
}
private void prepareMockTest() throws Exception {
// this setup connection service and basic mock properties
setBasicMockProperties(true);
}
}
class MockPutAzureCosmosDBRecord extends PutAzureCosmosDBRecord {
static CosmosClient mockClient = mock(CosmosClient.class);
static CosmosContainer mockContainer = mock(CosmosContainer.class);
private List<Map<String, Object>> mockBackend = new ArrayList<>();
@Override
protected void createCosmosClient(final String uri, final String accessKey, final ConsistencyLevel clevel) {
this.setCosmosClient(mockClient);
}
@Override
protected void getCosmosDocumentContainer(final ProcessContext context) throws CosmosException {
this.setContainer(mockContainer);
}
@Override
protected void bulkInsert(List<Map<String, Object>> records ) throws CosmosException{
this.mockBackend.addAll(records);
}
public List<Map<String, Object>> getTestResults() {
return mockBackend;
}
public CosmosContainer getMockConainer() {
return mockContainer;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.cosmos.document;
import org.apache.nifi.processors.azure.cosmos.document.AzureCosmosDBUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestAzureCosmosDBClientService {
private static final String MOCK_URI = "https://mockURI:443/";
private static final String DB_ACCESS_KEY = "mockDB_ACCESS_KEY";
private TestRunner runner;
private AzureCosmosDBClientService service;
@Before
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
service = new AzureCosmosDBClientService();
runner.addControllerService("connService", service);
}
@Test
public void testValidWithURIandDBAccessKey() {
configureURI();
configureDBAccessKey();
runner.assertValid(service);
}
@Test
public void testNotValidBecauseURIMissing() {
configureDBAccessKey();
runner.assertNotValid(service);
}
@Test
public void testNotValidBecauseDBAccessKeyMissing() {
configureURI();
runner.assertNotValid(service);
}
private void configureURI() {
runner.setProperty(service, AzureCosmosDBUtils.URI, MOCK_URI);
}
private void configureDBAccessKey() {
runner.setProperty(service, AzureCosmosDBUtils.DB_ACCESS_KEY, DB_ACCESS_KEY);
}
}

View File

@ -27,11 +27,38 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>${azure.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>${azure-cosmos.version}</version>
<exclusions>
<exclusion>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
@ -39,6 +66,16 @@
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.cosmos;
import com.azure.cosmos.CosmosClient;
import org.apache.nifi.controller.ControllerService;
public interface AzureCosmosDBConnectionService extends ControllerService {
String getURI();
String getAccessKey();
String getConsistencyLevel();
CosmosClient getCosmosClient();
}

View File

@ -27,8 +27,9 @@
<properties>
<azure-storage.version>8.4.0</azure-storage.version>
<azure.core.version>1.5.0</azure.core.version>
<azure.core.version>1.6.0</azure.core.version>
<jackson.version>2.10.3</jackson.version>
<azure-cosmos.version>4.2.0</azure-cosmos.version>
</properties>
<modules>