diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 69de94bc80..43c210c5a9 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -121,24 +121,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("ssl-context-service") - .displayName("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS/SSL " - + "connections.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .name("ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() - .name("ssl-client-auth") - .displayName("Client Auth") - .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " - + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " - + "has been defined and enabled.") - .required(false) - .allowableValues(SSLContextService.ClientAuth.values()) - .defaultValue("REQUIRED") - .build(); + .name("ssl-client-auth") + .displayName("Client Auth") + .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " + + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " + + "has been defined and enabled.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue("REQUIRED") + .build(); public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() .name("Write Concern") @@ -341,7 +341,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { } protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, - Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { + Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue(); FlowFile flowFile = parent != null ? session.create(parent) : session.create(); diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java new file mode 100644 index 0000000000..c31b016a10 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public interface QueryHelper { + AllowableValue MODE_ONE_COMMIT = new AllowableValue("all-at-once", "Full Query Fetch", + "Fetch the entire query result and then make it available to downstream processors."); + AllowableValue MODE_MANY_COMMITS = new AllowableValue("streaming", "Stream Query Results", + "As soon as the query start sending results to the downstream processors at regular intervals."); + + PropertyDescriptor OPERATION_MODE = new PropertyDescriptor.Builder() + .name("mongo-operation-mode") + .displayName("Operation Mode") + .allowableValues(MODE_ONE_COMMIT, MODE_MANY_COMMITS) + .defaultValue(MODE_ONE_COMMIT.getValue()) + .required(true) + .description("This option controls when results are made available to downstream processors. If Stream Query Results is enabled, " + + "provenance will not be tracked relative to the input flowfile if an input flowfile is received and starts the query. In Stream Query Results mode " + + "errors will be handled by sending a new flowfile with the original content and attributes of the input flowfile to the failure " + + "relationship. Streaming should only be used if there is reliable connectivity between MongoDB and NiFi.") + .addValidator(Validator.VALID) + .build(); + + default String readQuery(ProcessContext context, ProcessSession session, PropertyDescriptor queryProp, FlowFile input) throws IOException { + String queryStr; + + if (context.getProperty(queryProp).isSet()) { + queryStr = context.getProperty(queryProp).evaluateAttributeExpressions(input).getValue(); + } else if (!context.getProperty(queryProp).isSet() && input == null) { + queryStr = "{}"; + } else { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + queryStr = new String(out.toByteArray()); + } + + return queryStr; + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java new file mode 100644 index 0000000000..48368b635c --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; +import org.bson.types.ObjectId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractGridFSProcessor extends AbstractProcessor { + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("gridfs-client-service") + .displayName("Client Service") + .description("The MongoDB client service to use for database connections.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .identifiesControllerService(MongoDBClientService.class) + .build(); + + static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("gridfs-database-name") + .displayName("Mongo Database Name") + .description("The name of the database to use") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("gridfs-bucket-name") + .displayName("Bucket Name") + .description("The GridFS bucket where the files will be stored. If left blank, it will use the default value 'fs' " + + "that the MongoDB client driver uses.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder() + .name("gridfs-file-name") + .displayName("File Name") + .description("The name of the file in the bucket that is the target of this processor.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("mongo-query-attribute") + .displayName("Query Output Attribute") + .description("If set, the query will be written to a specified attribute on the output flowfiles.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .required(false) + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("When there is a failure processing the flowfile, it goes to this relationship.") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("When the operation succeeds, the flowfile is sent to this relationship.") + .build(); + + static final List PARENT_PROPERTIES; + + static final Set PARENT_RELATIONSHIPS; + + protected volatile MongoDBClientService clientService; + + static { + List _temp = new ArrayList<>(); + _temp.add(CLIENT_SERVICE); + _temp.add(DATABASE_NAME); + _temp.add(BUCKET_NAME); + PARENT_PROPERTIES = Collections.unmodifiableList(_temp); + + Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + PARENT_RELATIONSHIPS = Collections.unmodifiableSet(_rels); + } + + protected MongoDatabase getDatabase(FlowFile input, ProcessContext context) { + return clientService.getDatabase(context.getProperty(DATABASE_NAME) + .evaluateAttributeExpressions(input) + .getValue()); + } + + protected GridFSBucket getBucket(FlowFile input, ProcessContext context) { + final String name = getBucketName(input, context); + if (StringUtils.isEmpty(name)) { + return GridFSBuckets.create(getDatabase(input, context)); + } else { + return GridFSBuckets.create(getDatabase(input, context), name); + } + } + + protected String getBucketName(FlowFile input, ProcessContext context) { + return context.getProperty(BUCKET_NAME).isSet() + ? context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(input).getValue() + : null; + } + + protected String getTransitUri(ObjectId id, FlowFile input, ProcessContext context) { + String bucket = getBucketName(input, context); + String uri = clientService.getURI(); + return new StringBuilder() + .append(uri) + .append(uri.endsWith("/") ? "" : "/") + .append(bucket) + .append("/") + .append(id.toString()) + .toString(); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java new file mode 100644 index 0000000000..680731b367 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.client.MongoCursor; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.model.GridFSFile; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.JsonValidator; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; +import org.bson.Document; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@CapabilityDescription("Deletes a file from GridFS using a file name or a query.") +@Tags({"gridfs", "delete", "mongodb"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public class DeleteGridFS extends AbstractGridFSProcessor { + static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("delete-gridfs-query") + .displayName("Query") + .description("A valid MongoDB query to use to find and delete one or more files from GridFS.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(JsonValidator.INSTANCE) + .required(false) + .build(); + + static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder() + .name("gridfs-file-name") + .displayName("File Name") + .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " + + "include path information because GridFS does not sort files into folders within a bucket.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final List DESCRIPTORS; + + static { + List _temp = new ArrayList<>(); + _temp.addAll(PARENT_PROPERTIES); + _temp.add(FILE_NAME); + _temp.add(QUERY); + _temp.add(QUERY_ATTRIBUTE); + DESCRIPTORS = Collections.unmodifiableList(_temp); + } + + @Override + public Set getRelationships() { + return new HashSet<>(PARENT_RELATIONSHIPS); + } + + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + ArrayList problems = new ArrayList<>(); + + boolean fileName = validationContext.getProperty(FILE_NAME).isSet(); + boolean query = validationContext.getProperty(QUERY).isSet(); + + if (fileName && query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("File name and Query cannot be set at the same time.") + .build() + ); + } else if (!fileName && !query) { + problems.add(new ValidationResult.Builder() + .valid(false) + .explanation("File name or Query must be set, but not both at the same time.") + .build() + ); + } + + return problems; + } + + private String getQuery(ProcessContext context, FlowFile input) { + String queryString; + if (context.getProperty(FILE_NAME).isSet()) { + String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue(); + queryString = String.format("{ \"filename\": \"%s\"}", fileName); + } else { + queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); + } + + return queryString; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = session.get(); + if (input == null) { + return; + } + + final String deleteQuery = getQuery(context, input); + final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet() + ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() + : null; + GridFSBucket bucket = getBucket(input, context); + + try { + Document query = Document.parse(deleteQuery); + MongoCursor cursor = bucket.find(query).iterator(); + if (cursor.hasNext()) { + GridFSFile file = (GridFSFile)cursor.next(); + bucket.delete(file.getObjectId()); + + if (!StringUtils.isEmpty(queryAttribute)) { + input = session.putAttribute(input, queryAttribute, deleteQuery); + } + + session.transfer(input, REL_SUCCESS); + } else { + getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName())); + session.transfer(input, REL_FAILURE); + } + + cursor.close(); + } catch (Exception ex) { + getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex); + session.transfer(input, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java new file mode 100644 index 0000000000..11d4b87b02 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.client.MongoCursor; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.model.GridFSFile; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.JsonValidator; +import org.apache.nifi.processors.mongodb.QueryHelper; +import org.apache.nifi.util.StringUtils; +import org.bson.Document; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes( + @WritesAttribute(attribute = "gridfs.file.metadata", description = "The custom metadata stored with a file is attached to this property if it exists.") +) +@Tags({"fetch", "gridfs", "mongo"}) +@CapabilityDescription("Retrieves one or more files from a GridFS bucket by file name or by a user-defined query.") +public class FetchGridFS extends AbstractGridFSProcessor implements QueryHelper { + + static final String METADATA_ATTRIBUTE = "gridfs.file.metadata"; + + static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("gridfs-query") + .displayName("Query") + .description("A valid MongoDB query to use to fetch one or more files from GridFS.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(JsonValidator.INSTANCE) + .required(false) + .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original input flowfile goes to this relationship if the query does not cause an error") + .build(); + + static final List PROPERTY_DESCRIPTORS; + static final Set RELATIONSHIP_SET; + + static { + List _temp = new ArrayList<>(); + _temp.addAll(PARENT_PROPERTIES); + _temp.add(FILE_NAME); + _temp.add(QUERY); + _temp.add(QUERY_ATTRIBUTE); + _temp.add(OPERATION_MODE); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_temp); + + Set _rels = new HashSet<>(); + _rels.addAll(PARENT_RELATIONSHIPS); + _rels.add(REL_ORIGINAL); + RELATIONSHIP_SET = Collections.unmodifiableSet(_rels); + } + + @Override + public Set getRelationships() { + return RELATIONSHIP_SET; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + private String getQuery(ProcessSession session, ProcessContext context, FlowFile input) throws IOException { + String queryString; + if (context.getProperty(FILE_NAME).isSet()) { + String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue(); + queryString = String.format("{ \"filename\": \"%s\"}", fileName); + } else if (context.getProperty(QUERY).isSet()) { + queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); + } else { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + queryString = new String(out.toByteArray()); + } + + return queryString; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = session.get(); + if (input == null) { + return; + } + + final String operatingMode = context.getProperty(OPERATION_MODE).getValue(); + final Map originalAttributes = input.getAttributes(); + + String queryStr; + try { + queryStr = getQuery(session, context, input); + if (StringUtils.isEmpty(queryStr)) { + getLogger().error("No query could be found or built from the supplied input."); + session.transfer(input, REL_FAILURE); + return; + } + } catch (IOException ex) { + getLogger().error("No query could be found from supplied input", ex); + session.transfer(input, REL_FAILURE); + return; + } + + Document query = Document.parse(queryStr); + + try { + final GridFSBucket bucket = getBucket(input, context); + final String queryPtr = queryStr; + final FlowFile parent = operatingMode.equals(MODE_ONE_COMMIT.getValue()) ? input : null; + + MongoCursor it = bucket.find(query).iterator(); + if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) { + session.transfer(input, REL_ORIGINAL); + input = null; + } + + while (it.hasNext()) { + GridFSFile gridFSFile = (GridFSFile)it.next(); + handleFile(bucket, session, context, parent, gridFSFile, queryPtr); + + if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) { + session.commit(); + } + } + + if (input != null) { + session.transfer(input, REL_ORIGINAL); + } + } catch (Exception ex) { + getLogger().error("An error occurred wile trying to run the query.", ex); + if (input != null && operatingMode.equals(MODE_ONE_COMMIT.getValue())) { + session.transfer(input, REL_FAILURE); + } else if (input != null && operatingMode.equals(MODE_MANY_COMMITS.getValue())) { + final String queryPtr = queryStr; + FlowFile cloned = session.create(); + cloned = session.putAllAttributes(cloned, originalAttributes); + cloned = session.write(cloned, out -> out.write(queryPtr.getBytes())); + session.transfer(cloned, REL_FAILURE); + } + } + } + + private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) { + Map attrs = new HashMap<>(); + attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}"); + if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { + String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue(); + attrs.put(key, query); + } + attrs.put(CoreAttributes.FILENAME.key(), input.getFilename()); + FlowFile output = parent != null ? session.create(parent) : session.create(); + output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out)); + output = session.putAllAttributes(output, attrs); + session.transfer(output, REL_SUCCESS); + session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context)); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java new file mode 100644 index 0000000000..be9dd46a39 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; +import org.bson.Document; +import org.bson.types.ObjectId; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"mongo", "gridfs", "put", "file", "store"}) +@CapabilityDescription("Writes a file to a GridFS bucket.") +public class PutGridFS extends AbstractGridFSProcessor { + + static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder() + .name("putgridfs-properties-prefix") + .displayName("File Properties Prefix") + .description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(Validator.VALID) + .build(); + + static final AllowableValue NO_UNIQUE = new AllowableValue("none", "None", "No uniqueness will be enforced."); + static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " + + "be unique."); + static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " + + "unique."); + static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " + + "must be unique."); + + static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder() + .name("putgridfs-enforce-uniqueness") + .displayName("Enforce Uniqueness") + .description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " + + "that matches your selection. It should ideally be configured once when the bucket is created for the first time because " + + "it could take a long time to build on an existing bucket wit a lot of data.") + .allowableValues(NO_UNIQUE, UNIQUE_BOTH, UNIQUE_NAME, UNIQUE_HASH) + .defaultValue(NO_UNIQUE.getValue()) + .required(true) + .build(); + static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("putgridfs-hash-attribute") + .displayName("Hash Attribute") + .description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " + + "exists on all incoming flowfiles.") + .defaultValue("hash.value") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder() + .name("putgridfs-chunk-size") + .displayName("Chunk Size") + .description("Controls the maximum size of each chunk of a file uploaded into GridFS.") + .defaultValue("256 KB") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder() + .name("gridfs-file-name") + .displayName("File Name") + .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " + + "include path information because GridFS does not sort files into folders within a bucket.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final Relationship REL_DUPLICATE = new Relationship.Builder() + .name("duplicate") + .description("Flowfiles that fail the duplicate check are sent to this relationship.") + .build(); + + static final String ID_ATTRIBUTE = "gridfs.id"; + + static final List DESCRIPTORS; + static final Set RELATIONSHIP_SET; + + static { + List _temp = new ArrayList<>(); + _temp.addAll(PARENT_PROPERTIES); + _temp.add(FILE_NAME); + _temp.add(PROPERTIES_PREFIX); + _temp.add(ENFORCE_UNIQUENESS); + _temp.add(HASH_ATTRIBUTE); + _temp.add(CHUNK_SIZE); + DESCRIPTORS = Collections.unmodifiableList(_temp); + + Set _rels = new HashSet(); + _rels.addAll(PARENT_RELATIONSHIPS); + _rels.add(REL_DUPLICATE); + RELATIONSHIP_SET = Collections.unmodifiableSet(_rels); + } + + private String uniqueness; + private String hashAttribute; + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue(); + this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue(); + this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class); + } + + @Override + public Set getRelationships() { + return RELATIONSHIP_SET; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = session.get(); + if (input == null) { + return; + } + + GridFSBucket bucket = getBucket(input, context); + + if (!canUploadFile(context, input, bucket.getBucketName())) { + getLogger().error("Cannot upload the file because of the uniqueness policy configured."); + session.transfer(input, REL_DUPLICATE); + return; + } + + final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue(); + + try (InputStream fileInput = session.read(input)) { + String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue(); + GridFSUploadOptions options = new GridFSUploadOptions() + .chunkSizeBytes(chunkSize) + .metadata(getMetadata(input, context)); + ObjectId id = bucket.uploadFromStream(fileName, fileInput, options); + fileInput.close(); + + if (id != null) { + input = session.putAttribute(input, ID_ATTRIBUTE, id.toString()); + session.transfer(input, REL_SUCCESS); + session.getProvenanceReporter().send(input, getTransitUri(id, input, context)); + } else { + getLogger().error("ID was null, assuming failure."); + session.transfer(input, REL_FAILURE); + } + } catch (Exception ex) { + getLogger().error("Failed to upload file", ex); + session.transfer(input, REL_FAILURE); + } + } + + private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) { + boolean retVal; + + if (uniqueness.equals(NO_UNIQUE.getValue())) { + retVal = true; + } else { + final String fileName = input.getAttribute(CoreAttributes.FILENAME.key()); + final String fileColl = String.format("%s.files", bucketName); + final String hash = input.getAttribute(hashAttribute); + + if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) { + throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute)); + } + + Document query; + if (uniqueness.equals(UNIQUE_BOTH.getValue())) { + query = new Document().append("filename", fileName).append("md5", hash); + } else if (uniqueness.equals(UNIQUE_HASH.getValue())) { + query = new Document().append("md5", hash); + } else { + query = new Document().append("filename", fileName); + } + + retVal = getDatabase(input, context).getCollection(fileColl).count(query) == 0; + } + + return retVal; + } + + private Document getMetadata(FlowFile input, ProcessContext context) { + final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue(); + Document doc; + + if (StringUtils.isEmpty(prefix)) { + doc = Document.parse("{}"); + } else { + doc = new Document(); + Map attributes = input.getAttributes(); + for (Map.Entry entry : attributes.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix); + String cleanKey = entry.getKey().replace(cleanPrefix, ""); + doc.append(cleanKey, entry.getValue()); + } + } + } + + return doc; + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index bfe2b74095..3797ca0621 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,4 +18,7 @@ org.apache.nifi.processors.mongodb.GetMongo org.apache.nifi.processors.mongodb.GetMongoRecord org.apache.nifi.processors.mongodb.RunMongoAggregation org.apache.nifi.processors.mongodb.PutMongo -org.apache.nifi.processors.mongodb.PutMongoRecord \ No newline at end of file +org.apache.nifi.processors.mongodb.PutMongoRecord +org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS +org.apache.nifi.processors.mongodb.gridfs.FetchGridFS +org.apache.nifi.processors.mongodb.gridfs.PutGridFS \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html new file mode 100644 index 0000000000..b748755c3c --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html @@ -0,0 +1,32 @@ + + + + + + DeleteGridFS + + + + + +

Description:

+

+ This processor retrieves one or more files from GridFS. The query to execute can be either provided in the query + configuration parameter or generated from the value pulled from the filename configuration parameter. Upon successful + execution, it will append the query that was executed as an attribute on the flowfile that was processed. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html new file mode 100644 index 0000000000..279216c472 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html @@ -0,0 +1,43 @@ + + + + + + FetchGridFS + + + + + +

Description:

+

+ This processor retrieves one or more files from GridFS. The query can be provided in one of three ways: +

+ +
    +
  • Query configuration parameter.
  • +
  • Built for you by configuring the filename parameter. (Note: this is just a filename, Mongo queries cannot be + embedded in the field).
  • +
  • Retrieving the query from the flowfile contents.
  • +
+ +

+ The processor can also be configured to either commit only once at the end of a fetch operation or after each file + that is retrieved. Multiple commits is generally only necessary when retrieving a lot of data from GridFS as measured + in total data size, not file count, to ensure that the disks NiFi is using are not overloaded. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html new file mode 100644 index 0000000000..62330dd0da --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html @@ -0,0 +1,58 @@ + + + + + + PutGridFS + + + + + +

Description:

+

+ This processor puts a file with one or more user-defined metadata values into GridFS in the configured bucket. It + allows the user to define how big each file chunk will be during ingestion and provides some ability to intelligently + attempt to enforce file uniqueness using filename or hash values instead of just relying on a database index. +

+

GridFS File Attributes

+

+ PutGridFS allows for flowfile attributes that start with a configured prefix to be added to the GridFS + document. These can be very useful later when working with GridFS for providing metadata about a file. +

+

Chunk Size

+

+ GridFS splits up file into chunks within Mongo documents as the file is ingested into the database. The chunk size + configuration parameter configures the maximum size of each chunk. This field should be left at its default value + unless there is a specific business case to increase or decrease it. +

+

Uniqueness Enforcement

+

+ There are four operating modes: +

+
    +
  • No enforcement at the application level.
  • +
  • Enforce by unique file name.
  • +
  • Enforce by unique hash value.
  • +
  • Use both hash and file name.
  • +
+

+ The hash value by default is taken from the attribute hash.value which can be generated by configuring a + HashContent processor upstream of PutGridFS. Both this and the name option use a query on the existing + data to see if a file matching that criteria exists before attempting to write the flowfile contents. +

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java new file mode 100644 index 0000000000..e006ecbb0d --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.types.ObjectId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeleteGridFSIT extends GridFSITTestBase { + private TestRunner runner; + private static final String BUCKET = "delete_test_bucket"; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(DeleteGridFS.class); + super.setup(runner, BUCKET, false); + } + + @After + public void tearDown() { + super.tearDown(); + } + + @Test + public void testFileAndQueryAtSameTime() { + runner.setProperty(DeleteGridFS.FILE_NAME, "${test_var}"); + runner.setProperty(DeleteGridFS.QUERY, "{}"); + runner.assertNotValid(); + } + + @Test + public void testNeitherFileNorQuery() { + runner.assertNotValid(); + } + + @Test + public void testDeleteByFileName() { + testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), setupTestFile()); + } + + @Test + public void testDeleteByQuery() { + testDeleteByProperty(DeleteGridFS.QUERY, "{}", setupTestFile()); + } + + @Test + public void testQueryAttribute() { + String attrName = "gridfs.query.used"; + String fileName = setupTestFile(); + runner.setProperty(DeleteGridFS.QUERY_ATTRIBUTE, attrName); + testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), fileName); + testForQueryAttribute(fileName, attrName); + } + + private void testForQueryAttribute(String mustContain, String attrName) { + List flowFiles = runner.getFlowFilesForRelationship(DeleteGridFS.REL_SUCCESS); + String attribute = flowFiles.get(0).getAttribute(attrName); + Assert.assertTrue(attribute.contains(mustContain)); + } + + private String setupTestFile() { + String fileName = "simple-delete-test.txt"; + ObjectId id = writeTestFile(fileName, "Hello, world!", BUCKET, new HashMap<>()); + Assert.assertNotNull(id); + + return fileName; + } + + private void testDeleteByProperty(PropertyDescriptor descriptor, String value, String fileName) { + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + runner.setProperty(descriptor, value); + runner.assertValid(); + runner.enqueue("test", attrs); + runner.run(); + + runner.assertTransferCount(DeleteGridFS.REL_FAILURE, 0); + runner.assertTransferCount(DeleteGridFS.REL_SUCCESS, 1); + + Assert.assertFalse(String.format("File %s still exists.", fileName), fileExists(fileName, BUCKET)); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java new file mode 100644 index 0000000000..5ce4ff3358 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.mongodb.QueryHelper; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.types.ObjectId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchGridFSIT extends GridFSITTestBase { + TestRunner runner; + + static final String BUCKET = "get_test_bucket"; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(FetchGridFS.class); + super.setup(runner, BUCKET, false); + } + + @After + public void tearDown() { + super.tearDown(); + } + + @Test + public void testGetOneByName() { + final String fileName = "get_by_name.txt"; + final String content = "Hello, world"; + ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>()); + Assert.assertNotNull(id); + + String query = String.format("{\"filename\": \"%s\"}", fileName); + runner.enqueue(query); + runner.run(); + runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS); + byte[] rawData = runner.getContentAsByteArray(flowFiles.get(0)); + Assert.assertEquals("Data did not match for the file", new String(rawData), content); + + runner.clearTransferState(); + runner.setProperty(FetchGridFS.QUERY, query); + runner.enqueue("test"); + runner.run(); + + runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1); + flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS); + rawData = runner.getContentAsByteArray(flowFiles.get(0)); + Assert.assertEquals("Data did not match for the file", new String(rawData), content); + } + + @Test + public void testGetMany() { + String baseName = "test_file_%d.txt"; + String content = "Hello, world take %d"; + for (int index = 0; index < 5; index++) { + ObjectId id = writeTestFile(String.format(baseName, index), String.format(content, index), BUCKET, new HashMap<>()); + Assert.assertNotNull(id); + } + + AllowableValue[] values = new AllowableValue[] { QueryHelper.MODE_MANY_COMMITS, QueryHelper.MODE_ONE_COMMIT }; + + for (AllowableValue value : values) { + String query = "{}"; + runner.setProperty(FetchGridFS.OPERATION_MODE, value); + runner.enqueue(query); + runner.run(); + + runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 5); + runner.clearTransferState(); + } + } + + @Test + public void testQueryAttribute() { + final String fileName = "get_by_name.txt"; + final String content = "Hello, world"; + ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>()); + Assert.assertNotNull(id); + + final String queryAttr = "gridfs.query.used"; + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key())); + runner.setProperty(FetchGridFS.QUERY_ATTRIBUTE, queryAttr); + runner.enqueue(content, attrs); + runner.run(); + + runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1); + MockFlowFile mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0); + String attr = mff.getAttribute(queryAttr); + Assert.assertNotNull("Query attribute was null.", attr); + Assert.assertTrue("Wrong content.", attr.contains("filename")); + + runner.clearTransferState(); + + id = writeTestFile(fileName, content, BUCKET, new HashMap(){{ + put("lookupKey", "xyz"); + }}); + Assert.assertNotNull(id); + + String query = "{ \"metadata\": { \"lookupKey\": \"xyz\" }}"; + + runner.removeProperty(FetchGridFS.FILE_NAME); + runner.setProperty(FetchGridFS.QUERY, query); + runner.enqueue(content, attrs); + runner.run(); + runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1); + mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0); + attr = mff.getAttribute(queryAttr); + Assert.assertNotNull("Query attribute was null.", attr); + Assert.assertTrue("Wrong content.", attr.contains("metadata")); + } + + @Test + public void testGetQueryFromBody() { + runner.enqueue("{}"); + testQueryFromSource(0, 1, 1); + } + + @Test + public void testGetQueryFromQueryParam() { + runner.setProperty(FetchGridFS.QUERY, "{}"); + runner.enqueue(""); + testQueryFromSource(0, 1, 1); + } + + @Test + public void testGetQueryFromFileNameParam() { + Map attr = new HashMap<>(); + attr.put(CoreAttributes.FILENAME.key(), "get_by_name.txt"); + runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key())); + runner.enqueue("test", attr); + testQueryFromSource(0, 1, 1); + } + + private void testQueryFromSource(int failure, int original, int success) { + final String fileName = "get_by_name.txt"; + final String content = "Hello, world"; + ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>()); + Assert.assertNotNull(id); + + runner.run(); + runner.assertTransferCount(FetchGridFS.REL_FAILURE, failure); + runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, original); + runner.assertTransferCount(FetchGridFS.REL_SUCCESS, success); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java new file mode 100644 index 0000000000..45e7cb244b --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; +import org.apache.nifi.mongodb.MongoDBClientService; +import org.apache.nifi.mongodb.MongoDBControllerService; +import org.apache.nifi.util.TestRunner; +import org.bson.Document; +import org.bson.types.ObjectId; + +import java.io.ByteArrayInputStream; +import java.util.Map; + +public class GridFSITTestBase { + static final String URI = "mongodb://localhost:27017"; + static final String DB = "gridfs_test_database"; + MongoClient client; + + public void setup(TestRunner runner, String bucketName) throws Exception { + setup(runner, bucketName, true); + } + + public void setup(TestRunner runner, String bucketName, boolean validate) throws Exception { + MongoDBClientService clientService = new MongoDBControllerService(); + runner.addControllerService("clientService", clientService); + runner.setProperty(AbstractGridFSProcessor.CLIENT_SERVICE, "clientService"); + runner.setProperty(clientService, MongoDBControllerService.URI, URI); + runner.setProperty(AbstractGridFSProcessor.BUCKET_NAME, bucketName); + runner.setProperty(AbstractGridFSProcessor.DATABASE_NAME, DB); + runner.enableControllerService(clientService); + runner.setValidateExpressionUsage(true); + if (validate) { + runner.assertValid(); + } + + client = new MongoClient("localhost", 27017); + } + public void tearDown() { + client.dropDatabase(DB); + client.close(); + } + + public boolean fileExists(String name, String bucketName) { + GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName); + MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator(); + boolean retVal = it.hasNext(); + it.close(); + + return retVal; + } + + public ObjectId writeTestFile(String fileName, String content, String bucketName, Map attrs) { + GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName); + GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs)); + ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes()); + ObjectId retVal = bucket.uploadFromStream(fileName, input, options); + + return retVal; + } + + public boolean fileHasProperties(String name, String bucketName, Map attrs) { + GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName); + MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator(); + boolean retVal = false; + + if (it.hasNext()) { + GridFSFile file = (GridFSFile)it.next(); + Document metadata = file.getMetadata(); + if (metadata != null && metadata.size() == attrs.size()) { + retVal = true; + for (Map.Entry entry : metadata.entrySet()) { + Object val = attrs.get(entry.getKey()); + if (val == null || !entry.getValue().equals(val)) { + retVal = false; + break; + } + } + } + } + + it.close(); + + return retVal; + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java new file mode 100644 index 0000000000..dfd7ae093d --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.mongodb.gridfs; + +import com.mongodb.client.MongoCollection; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PutGridFSIT extends GridFSITTestBase { + TestRunner runner; + + static final String BUCKET = "put_test_bucket"; + + @Before + public void setup() throws Exception { + runner = TestRunners.newTestRunner(PutGridFS.class); + runner.setProperty(PutGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key())); + super.setup(runner, BUCKET); + } + + @After + public void tearDown() { + super.tearDown(); + } + + @Test + public void testSimplePut() { + final String fileName = "simple_test.txt"; + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + + runner.enqueue("12345", attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS); + + Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET)); + } + + @Test + public void testWithProperties() { + final String fileName = "simple_test_props.txt"; + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + attrs.put("prop.created_by", "john.smith"); + attrs.put("prop.created_for", "jane.doe"); + attrs.put("prop.restrictions", "PHI&PII"); + attrs.put("prop.department", "Accounting"); + + runner.setProperty(PutGridFS.PROPERTIES_PREFIX, "prop"); + runner.enqueue("12345", attrs); + runner.run(); + runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS); + + attrs = new HashMap(){{ + put("created_by", "john.smith"); + put("created_for", "jane.doe"); + put("restrictions", "PHI&PII"); + put("department", "Accounting"); + }}; + + Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET)); + Assert.assertTrue("File is missing PARENT_PROPERTIES", fileHasProperties(fileName, BUCKET, attrs)); + } + + @Test + public void testNoUniqueness() { + String fileName = "test_duplicates.txt"; + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + + for (int x = 0; x < 10; x++) { + runner.enqueue("Duplicates are ok.", attrs); + runner.run(); + } + + runner.assertTransferCount(PutGridFS.REL_SUCCESS, 10); + + String bucketName = String.format("%s.files", BUCKET); + MongoCollection files = client.getDatabase(DB).getCollection(bucketName); + Document query = Document.parse(String.format("{\"filename\": \"%s\"}", fileName)); + long count = files.count(query); + Assert.assertTrue("Wrong count", count == 10); + } + + @Test + public void testFileNameUniqueness() { + String fileName = "test_duplicates.txt"; + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + testUniqueness(attrs, "Hello, world", PutGridFS.UNIQUE_NAME); + } + + @Test + public void testFileNameAndHashUniqueness() { + testHashUniqueness(PutGridFS.UNIQUE_BOTH); + } + + @Test + public void testHashUniqueness() { + testHashUniqueness(PutGridFS.UNIQUE_HASH); + } + + + @Test + public void testChunkSize() { + String[] chunkSizes = new String[] { "128 KB", "256 KB", "384 KB", "512KB", "768KB", "1024 KB" }; + StringBuilder sb = new StringBuilder(); + for (int x = 0; x < 10000; x++) { + sb.append("This is a test string used to build up a largish text file."); + } + final String testData = sb.toString(); + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), "big-putgridfs-test-file.txt"); + + for (String chunkSize : chunkSizes) { + runner.setProperty(PutGridFS.CHUNK_SIZE, chunkSize); + runner.enqueue(testData, attrs); + runner.run(); + runner.assertTransferCount(PutGridFS.REL_FAILURE, 0); + runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0); + runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1); + + runner.clearTransferState(); + } + + runner.setProperty(PutGridFS.CHUNK_SIZE, "${gridfs.chunk.size}"); + attrs.put("gridfs.chunk.size", "768 KB"); + runner.enqueue(testData, attrs); + runner.run(); + runner.assertTransferCount(PutGridFS.REL_FAILURE, 0); + runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0); + runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1); + } + + private void testHashUniqueness(AllowableValue value) { + String hashAttr = "hash.value"; + String fileName = "test_duplicates.txt"; + String content = "Hello, world"; + String hash = DigestUtils.md5Hex(content); + Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.FILENAME.key(), fileName); + attrs.put(hashAttr, hash); + testUniqueness(attrs, content, value); + } + + private void testUniqueness(Map attrs, String content, AllowableValue param) { + runner.setProperty(PutGridFS.ENFORCE_UNIQUENESS, param); + for (int x = 0; x < 5; x++) { + runner.enqueue(content, attrs); + runner.run(); + } + + runner.assertTransferCount(PutGridFS.REL_FAILURE, 0); + runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 4); + runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1); + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java index c731fe483c..6fdebf574b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -40,8 +40,8 @@ import java.util.List; @Tags({"mongo", "mongodb", "service"}) @CapabilityDescription( - "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " + - "other Mongo-related components." + "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " + + "other Mongo-related components." ) public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService { private String uri;