NIFI-4975 Add GridFS processors

NIFI-4975 Added changes requested in a code review.
NIFI-4975 Reverted some base Mongo changes.
NIFI-4975 Moved connection configuration to using Mongo client service.
NIFI-4975 Fixed a lot of style issues.
NIFI-4975 Removed an EL statement that was causing problems with the UI.
NIFI-4975 Added changes from code review.
NIFI-4975 Added additional details for FetchGridFS.
NIFI-4975 Added documentation for DeleteGridFS.
NIFI-4975 Added documentation for PutGridFS.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2546
This commit is contained in:
Mike Thomsen 2018-03-10 12:57:28 -05:00 committed by Matthew Burgess
parent 72244d09ff
commit 033b2a1940
15 changed files with 1587 additions and 20 deletions

View File

@ -121,24 +121,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("ssl-client-auth")
.displayName("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue("REQUIRED")
.build();
.name("ssl-client-auth")
.displayName("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue("REQUIRED")
.build();
public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
.name("Write Concern")
@ -341,7 +341,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session,
Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue();
FlowFile flowFile = parent != null ? session.create(parent) : session.create();

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public interface QueryHelper {
AllowableValue MODE_ONE_COMMIT = new AllowableValue("all-at-once", "Full Query Fetch",
"Fetch the entire query result and then make it available to downstream processors.");
AllowableValue MODE_MANY_COMMITS = new AllowableValue("streaming", "Stream Query Results",
"As soon as the query start sending results to the downstream processors at regular intervals.");
PropertyDescriptor OPERATION_MODE = new PropertyDescriptor.Builder()
.name("mongo-operation-mode")
.displayName("Operation Mode")
.allowableValues(MODE_ONE_COMMIT, MODE_MANY_COMMITS)
.defaultValue(MODE_ONE_COMMIT.getValue())
.required(true)
.description("This option controls when results are made available to downstream processors. If Stream Query Results is enabled, " +
"provenance will not be tracked relative to the input flowfile if an input flowfile is received and starts the query. In Stream Query Results mode " +
"errors will be handled by sending a new flowfile with the original content and attributes of the input flowfile to the failure " +
"relationship. Streaming should only be used if there is reliable connectivity between MongoDB and NiFi.")
.addValidator(Validator.VALID)
.build();
default String readQuery(ProcessContext context, ProcessSession session, PropertyDescriptor queryProp, FlowFile input) throws IOException {
String queryStr;
if (context.getProperty(queryProp).isSet()) {
queryStr = context.getProperty(queryProp).evaluateAttributeExpressions(input).getValue();
} else if (!context.getProperty(queryProp).isSet() && input == null) {
queryStr = "{}";
} else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
queryStr = new String(out.toByteArray());
}
return queryStr;
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.bson.types.ObjectId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public abstract class AbstractGridFSProcessor extends AbstractProcessor {
static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("gridfs-client-service")
.displayName("Client Service")
.description("The MongoDB client service to use for database connections.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.identifiesControllerService(MongoDBClientService.class)
.build();
static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("gridfs-database-name")
.displayName("Mongo Database Name")
.description("The name of the database to use")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
.name("gridfs-bucket-name")
.displayName("Bucket Name")
.description("The GridFS bucket where the files will be stored. If left blank, it will use the default value 'fs' " +
"that the MongoDB client driver uses.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(Validator.VALID)
.build();
static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("gridfs-file-name")
.displayName("File Name")
.description("The name of the file in the bucket that is the target of this processor.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("mongo-query-attribute")
.displayName("Query Output Attribute")
.description("If set, the query will be written to a specified attribute on the output flowfiles.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.required(false)
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("When there is a failure processing the flowfile, it goes to this relationship.")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("When the operation succeeds, the flowfile is sent to this relationship.")
.build();
static final List<PropertyDescriptor> PARENT_PROPERTIES;
static final Set<Relationship> PARENT_RELATIONSHIPS;
protected volatile MongoDBClientService clientService;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(CLIENT_SERVICE);
_temp.add(DATABASE_NAME);
_temp.add(BUCKET_NAME);
PARENT_PROPERTIES = Collections.unmodifiableList(_temp);
Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
PARENT_RELATIONSHIPS = Collections.unmodifiableSet(_rels);
}
protected MongoDatabase getDatabase(FlowFile input, ProcessContext context) {
return clientService.getDatabase(context.getProperty(DATABASE_NAME)
.evaluateAttributeExpressions(input)
.getValue());
}
protected GridFSBucket getBucket(FlowFile input, ProcessContext context) {
final String name = getBucketName(input, context);
if (StringUtils.isEmpty(name)) {
return GridFSBuckets.create(getDatabase(input, context));
} else {
return GridFSBuckets.create(getDatabase(input, context), name);
}
}
protected String getBucketName(FlowFile input, ProcessContext context) {
return context.getProperty(BUCKET_NAME).isSet()
? context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(input).getValue()
: null;
}
protected String getTransitUri(ObjectId id, FlowFile input, ProcessContext context) {
String bucket = getBucketName(input, context);
String uri = clientService.getURI();
return new StringBuilder()
.append(uri)
.append(uri.endsWith("/") ? "" : "/")
.append(bucket)
.append("/")
.append(id.toString())
.toString();
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.model.GridFSFile;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@CapabilityDescription("Deletes a file from GridFS using a file name or a query.")
@Tags({"gridfs", "delete", "mongodb"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class DeleteGridFS extends AbstractGridFSProcessor {
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("delete-gridfs-query")
.displayName("Query")
.description("A valid MongoDB query to use to find and delete one or more files from GridFS.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.required(false)
.build();
static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("gridfs-file-name")
.displayName("File Name")
.description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
"include path information because GridFS does not sort files into folders within a bucket.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final List<PropertyDescriptor> DESCRIPTORS;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.addAll(PARENT_PROPERTIES);
_temp.add(FILE_NAME);
_temp.add(QUERY);
_temp.add(QUERY_ATTRIBUTE);
DESCRIPTORS = Collections.unmodifiableList(_temp);
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<>(PARENT_RELATIONSHIPS);
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
ArrayList<ValidationResult> problems = new ArrayList<>();
boolean fileName = validationContext.getProperty(FILE_NAME).isSet();
boolean query = validationContext.getProperty(QUERY).isSet();
if (fileName && query) {
problems.add(new ValidationResult.Builder()
.valid(false)
.explanation("File name and Query cannot be set at the same time.")
.build()
);
} else if (!fileName && !query) {
problems.add(new ValidationResult.Builder()
.valid(false)
.explanation("File name or Query must be set, but not both at the same time.")
.build()
);
}
return problems;
}
private String getQuery(ProcessContext context, FlowFile input) {
String queryString;
if (context.getProperty(FILE_NAME).isSet()) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
queryString = String.format("{ \"filename\": \"%s\"}", fileName);
} else {
queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
}
return queryString;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String deleteQuery = getQuery(context, input);
final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
GridFSBucket bucket = getBucket(input, context);
try {
Document query = Document.parse(deleteQuery);
MongoCursor cursor = bucket.find(query).iterator();
if (cursor.hasNext()) {
GridFSFile file = (GridFSFile)cursor.next();
bucket.delete(file.getObjectId());
if (!StringUtils.isEmpty(queryAttribute)) {
input = session.putAttribute(input, queryAttribute, deleteQuery);
}
session.transfer(input, REL_SUCCESS);
} else {
getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
session.transfer(input, REL_FAILURE);
}
cursor.close();
} catch (Exception ex) {
getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
session.transfer(input, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.model.GridFSFile;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processors.mongodb.QueryHelper;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes(
@WritesAttribute(attribute = "gridfs.file.metadata", description = "The custom metadata stored with a file is attached to this property if it exists.")
)
@Tags({"fetch", "gridfs", "mongo"})
@CapabilityDescription("Retrieves one or more files from a GridFS bucket by file name or by a user-defined query.")
public class FetchGridFS extends AbstractGridFSProcessor implements QueryHelper {
static final String METADATA_ATTRIBUTE = "gridfs.file.metadata";
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("gridfs-query")
.displayName("Query")
.description("A valid MongoDB query to use to fetch one or more files from GridFS.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
.required(false)
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input flowfile goes to this relationship if the query does not cause an error")
.build();
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static final Set<Relationship> RELATIONSHIP_SET;
static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.addAll(PARENT_PROPERTIES);
_temp.add(FILE_NAME);
_temp.add(QUERY);
_temp.add(QUERY_ATTRIBUTE);
_temp.add(OPERATION_MODE);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_temp);
Set<Relationship> _rels = new HashSet<>();
_rels.addAll(PARENT_RELATIONSHIPS);
_rels.add(REL_ORIGINAL);
RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIP_SET;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
private String getQuery(ProcessSession session, ProcessContext context, FlowFile input) throws IOException {
String queryString;
if (context.getProperty(FILE_NAME).isSet()) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
queryString = String.format("{ \"filename\": \"%s\"}", fileName);
} else if (context.getProperty(QUERY).isSet()) {
queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
} else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
queryString = new String(out.toByteArray());
}
return queryString;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String operatingMode = context.getProperty(OPERATION_MODE).getValue();
final Map<String, String> originalAttributes = input.getAttributes();
String queryStr;
try {
queryStr = getQuery(session, context, input);
if (StringUtils.isEmpty(queryStr)) {
getLogger().error("No query could be found or built from the supplied input.");
session.transfer(input, REL_FAILURE);
return;
}
} catch (IOException ex) {
getLogger().error("No query could be found from supplied input", ex);
session.transfer(input, REL_FAILURE);
return;
}
Document query = Document.parse(queryStr);
try {
final GridFSBucket bucket = getBucket(input, context);
final String queryPtr = queryStr;
final FlowFile parent = operatingMode.equals(MODE_ONE_COMMIT.getValue()) ? input : null;
MongoCursor it = bucket.find(query).iterator();
if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
session.transfer(input, REL_ORIGINAL);
input = null;
}
while (it.hasNext()) {
GridFSFile gridFSFile = (GridFSFile)it.next();
handleFile(bucket, session, context, parent, gridFSFile, queryPtr);
if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
session.commit();
}
}
if (input != null) {
session.transfer(input, REL_ORIGINAL);
}
} catch (Exception ex) {
getLogger().error("An error occurred wile trying to run the query.", ex);
if (input != null && operatingMode.equals(MODE_ONE_COMMIT.getValue())) {
session.transfer(input, REL_FAILURE);
} else if (input != null && operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
final String queryPtr = queryStr;
FlowFile cloned = session.create();
cloned = session.putAllAttributes(cloned, originalAttributes);
cloned = session.write(cloned, out -> out.write(queryPtr.getBytes()));
session.transfer(cloned, REL_FAILURE);
}
}
}
private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) {
Map<String, String> attrs = new HashMap<>();
attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue();
attrs.put(key, query);
}
attrs.put(CoreAttributes.FILENAME.key(), input.getFilename());
FlowFile output = parent != null ? session.create(parent) : session.create();
output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out));
output = session.putAllAttributes(output, attrs);
session.transfer(output, REL_SUCCESS);
session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context));
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"mongo", "gridfs", "put", "file", "store"})
@CapabilityDescription("Writes a file to a GridFS bucket.")
public class PutGridFS extends AbstractGridFSProcessor {
static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder()
.name("putgridfs-properties-prefix")
.displayName("File Properties Prefix")
.description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(Validator.VALID)
.build();
static final AllowableValue NO_UNIQUE = new AllowableValue("none", "None", "No uniqueness will be enforced.");
static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " +
"be unique.");
static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " +
"unique.");
static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " +
"must be unique.");
static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder()
.name("putgridfs-enforce-uniqueness")
.displayName("Enforce Uniqueness")
.description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " +
"that matches your selection. It should ideally be configured once when the bucket is created for the first time because " +
"it could take a long time to build on an existing bucket wit a lot of data.")
.allowableValues(NO_UNIQUE, UNIQUE_BOTH, UNIQUE_NAME, UNIQUE_HASH)
.defaultValue(NO_UNIQUE.getValue())
.required(true)
.build();
static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("putgridfs-hash-attribute")
.displayName("Hash Attribute")
.description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " +
"exists on all incoming flowfiles.")
.defaultValue("hash.value")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
.name("putgridfs-chunk-size")
.displayName("Chunk Size")
.description("Controls the maximum size of each chunk of a file uploaded into GridFS.")
.defaultValue("256 KB")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("gridfs-file-name")
.displayName("File Name")
.description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
"include path information because GridFS does not sort files into folders within a bucket.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_DUPLICATE = new Relationship.Builder()
.name("duplicate")
.description("Flowfiles that fail the duplicate check are sent to this relationship.")
.build();
static final String ID_ATTRIBUTE = "gridfs.id";
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIP_SET;
static {
List _temp = new ArrayList<>();
_temp.addAll(PARENT_PROPERTIES);
_temp.add(FILE_NAME);
_temp.add(PROPERTIES_PREFIX);
_temp.add(ENFORCE_UNIQUENESS);
_temp.add(HASH_ATTRIBUTE);
_temp.add(CHUNK_SIZE);
DESCRIPTORS = Collections.unmodifiableList(_temp);
Set _rels = new HashSet();
_rels.addAll(PARENT_RELATIONSHIPS);
_rels.add(REL_DUPLICATE);
RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
}
private String uniqueness;
private String hashAttribute;
@OnScheduled
public void onScheduled(ProcessContext context) {
this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue();
this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue();
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIP_SET;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
GridFSBucket bucket = getBucket(input, context);
if (!canUploadFile(context, input, bucket.getBucketName())) {
getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
session.transfer(input, REL_DUPLICATE);
return;
}
final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
try (InputStream fileInput = session.read(input)) {
String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
GridFSUploadOptions options = new GridFSUploadOptions()
.chunkSizeBytes(chunkSize)
.metadata(getMetadata(input, context));
ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
fileInput.close();
if (id != null) {
input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
session.transfer(input, REL_SUCCESS);
session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
} else {
getLogger().error("ID was null, assuming failure.");
session.transfer(input, REL_FAILURE);
}
} catch (Exception ex) {
getLogger().error("Failed to upload file", ex);
session.transfer(input, REL_FAILURE);
}
}
private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) {
boolean retVal;
if (uniqueness.equals(NO_UNIQUE.getValue())) {
retVal = true;
} else {
final String fileName = input.getAttribute(CoreAttributes.FILENAME.key());
final String fileColl = String.format("%s.files", bucketName);
final String hash = input.getAttribute(hashAttribute);
if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) {
throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute));
}
Document query;
if (uniqueness.equals(UNIQUE_BOTH.getValue())) {
query = new Document().append("filename", fileName).append("md5", hash);
} else if (uniqueness.equals(UNIQUE_HASH.getValue())) {
query = new Document().append("md5", hash);
} else {
query = new Document().append("filename", fileName);
}
retVal = getDatabase(input, context).getCollection(fileColl).count(query) == 0;
}
return retVal;
}
private Document getMetadata(FlowFile input, ProcessContext context) {
final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue();
Document doc;
if (StringUtils.isEmpty(prefix)) {
doc = Document.parse("{}");
} else {
doc = new Document();
Map<String, String> attributes = input.getAttributes();
for (Map.Entry<String, String> entry : attributes.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix);
String cleanKey = entry.getKey().replace(cleanPrefix, "");
doc.append(cleanKey, entry.getValue());
}
}
}
return doc;
}
}

View File

@ -18,4 +18,7 @@ org.apache.nifi.processors.mongodb.GetMongo
org.apache.nifi.processors.mongodb.GetMongoRecord
org.apache.nifi.processors.mongodb.RunMongoAggregation
org.apache.nifi.processors.mongodb.PutMongo
org.apache.nifi.processors.mongodb.PutMongoRecord
org.apache.nifi.processors.mongodb.PutMongoRecord
org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS
org.apache.nifi.processors.mongodb.gridfs.FetchGridFS
org.apache.nifi.processors.mongodb.gridfs.PutGridFS

View File

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>DeleteGridFS</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This processor retrieves one or more files from GridFS. The query to execute can be either provided in the query
configuration parameter or generated from the value pulled from the filename configuration parameter. Upon successful
execution, it will append the query that was executed as an attribute on the flowfile that was processed.
</p>
</body>
</html>

View File

@ -0,0 +1,43 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>FetchGridFS</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This processor retrieves one or more files from GridFS. The query can be provided in one of three ways:
</p>
<ul>
<li>Query configuration parameter.</li>
<li>Built for you by configuring the filename parameter. (Note: this is just a filename, Mongo queries cannot be
embedded in the field).</li>
<li>Retrieving the query from the flowfile contents.</li>
</ul>
<p>
The processor can also be configured to either commit only once at the end of a fetch operation or after each file
that is retrieved. Multiple commits is generally only necessary when retrieving a lot of data from GridFS as measured
in total data size, not file count, to ensure that the disks NiFi is using are not overloaded.
</p>
</body>
</html>

View File

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutGridFS</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This processor puts a file with one or more user-defined metadata values into GridFS in the configured bucket. It
allows the user to define how big each file chunk will be during ingestion and provides some ability to intelligently
attempt to enforce file uniqueness using filename or hash values instead of just relying on a database index.
</p>
<h3>GridFS File Attributes</h3>
<p>
<em>PutGridFS</em> allows for flowfile attributes that start with a configured prefix to be added to the GridFS
document. These can be very useful later when working with GridFS for providing metadata about a file.
</p>
<h3>Chunk Size</h3>
<p>
GridFS splits up file into chunks within Mongo documents as the file is ingested into the database. The chunk size
configuration parameter configures the maximum size of each chunk. This field should be left at its default value
unless there is a specific business case to increase or decrease it.
</p>
<h3>Uniqueness Enforcement</h3>
<p>
There are four operating modes:
</p>
<ul>
<li>No enforcement at the application level.</li>
<li>Enforce by unique file name.</li>
<li>Enforce by unique hash value.</li>
<li>Use both hash and file name.</li>
</ul>
<p>
The hash value by default is taken from the attribute <em>hash.value</em> which can be generated by configuring a
<em>HashContent</em> processor upstream of <em>PutGridFS</em>. Both this and the name option use a query on the existing
data to see if a file matching that criteria exists before attempting to write the flowfile contents.
</p>
</body>
</html>

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DeleteGridFSIT extends GridFSITTestBase {
private TestRunner runner;
private static final String BUCKET = "delete_test_bucket";
@Before
public void setup() throws Exception {
runner = TestRunners.newTestRunner(DeleteGridFS.class);
super.setup(runner, BUCKET, false);
}
@After
public void tearDown() {
super.tearDown();
}
@Test
public void testFileAndQueryAtSameTime() {
runner.setProperty(DeleteGridFS.FILE_NAME, "${test_var}");
runner.setProperty(DeleteGridFS.QUERY, "{}");
runner.assertNotValid();
}
@Test
public void testNeitherFileNorQuery() {
runner.assertNotValid();
}
@Test
public void testDeleteByFileName() {
testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), setupTestFile());
}
@Test
public void testDeleteByQuery() {
testDeleteByProperty(DeleteGridFS.QUERY, "{}", setupTestFile());
}
@Test
public void testQueryAttribute() {
String attrName = "gridfs.query.used";
String fileName = setupTestFile();
runner.setProperty(DeleteGridFS.QUERY_ATTRIBUTE, attrName);
testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), fileName);
testForQueryAttribute(fileName, attrName);
}
private void testForQueryAttribute(String mustContain, String attrName) {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteGridFS.REL_SUCCESS);
String attribute = flowFiles.get(0).getAttribute(attrName);
Assert.assertTrue(attribute.contains(mustContain));
}
private String setupTestFile() {
String fileName = "simple-delete-test.txt";
ObjectId id = writeTestFile(fileName, "Hello, world!", BUCKET, new HashMap<>());
Assert.assertNotNull(id);
return fileName;
}
private void testDeleteByProperty(PropertyDescriptor descriptor, String value, String fileName) {
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
runner.setProperty(descriptor, value);
runner.assertValid();
runner.enqueue("test", attrs);
runner.run();
runner.assertTransferCount(DeleteGridFS.REL_FAILURE, 0);
runner.assertTransferCount(DeleteGridFS.REL_SUCCESS, 1);
Assert.assertFalse(String.format("File %s still exists.", fileName), fileExists(fileName, BUCKET));
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.mongodb.QueryHelper;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FetchGridFSIT extends GridFSITTestBase {
TestRunner runner;
static final String BUCKET = "get_test_bucket";
@Before
public void setup() throws Exception {
runner = TestRunners.newTestRunner(FetchGridFS.class);
super.setup(runner, BUCKET, false);
}
@After
public void tearDown() {
super.tearDown();
}
@Test
public void testGetOneByName() {
final String fileName = "get_by_name.txt";
final String content = "Hello, world";
ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
Assert.assertNotNull(id);
String query = String.format("{\"filename\": \"%s\"}", fileName);
runner.enqueue(query);
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
byte[] rawData = runner.getContentAsByteArray(flowFiles.get(0));
Assert.assertEquals("Data did not match for the file", new String(rawData), content);
runner.clearTransferState();
runner.setProperty(FetchGridFS.QUERY, query);
runner.enqueue("test");
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
rawData = runner.getContentAsByteArray(flowFiles.get(0));
Assert.assertEquals("Data did not match for the file", new String(rawData), content);
}
@Test
public void testGetMany() {
String baseName = "test_file_%d.txt";
String content = "Hello, world take %d";
for (int index = 0; index < 5; index++) {
ObjectId id = writeTestFile(String.format(baseName, index), String.format(content, index), BUCKET, new HashMap<>());
Assert.assertNotNull(id);
}
AllowableValue[] values = new AllowableValue[] { QueryHelper.MODE_MANY_COMMITS, QueryHelper.MODE_ONE_COMMIT };
for (AllowableValue value : values) {
String query = "{}";
runner.setProperty(FetchGridFS.OPERATION_MODE, value);
runner.enqueue(query);
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 5);
runner.clearTransferState();
}
}
@Test
public void testQueryAttribute() {
final String fileName = "get_by_name.txt";
final String content = "Hello, world";
ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
Assert.assertNotNull(id);
final String queryAttr = "gridfs.query.used";
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
runner.setProperty(FetchGridFS.QUERY_ATTRIBUTE, queryAttr);
runner.enqueue(content, attrs);
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
MockFlowFile mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
String attr = mff.getAttribute(queryAttr);
Assert.assertNotNull("Query attribute was null.", attr);
Assert.assertTrue("Wrong content.", attr.contains("filename"));
runner.clearTransferState();
id = writeTestFile(fileName, content, BUCKET, new HashMap<String, Object>(){{
put("lookupKey", "xyz");
}});
Assert.assertNotNull(id);
String query = "{ \"metadata\": { \"lookupKey\": \"xyz\" }}";
runner.removeProperty(FetchGridFS.FILE_NAME);
runner.setProperty(FetchGridFS.QUERY, query);
runner.enqueue(content, attrs);
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
attr = mff.getAttribute(queryAttr);
Assert.assertNotNull("Query attribute was null.", attr);
Assert.assertTrue("Wrong content.", attr.contains("metadata"));
}
@Test
public void testGetQueryFromBody() {
runner.enqueue("{}");
testQueryFromSource(0, 1, 1);
}
@Test
public void testGetQueryFromQueryParam() {
runner.setProperty(FetchGridFS.QUERY, "{}");
runner.enqueue("");
testQueryFromSource(0, 1, 1);
}
@Test
public void testGetQueryFromFileNameParam() {
Map<String, String> attr = new HashMap<>();
attr.put(CoreAttributes.FILENAME.key(), "get_by_name.txt");
runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
runner.enqueue("test", attr);
testQueryFromSource(0, 1, 1);
}
private void testQueryFromSource(int failure, int original, int success) {
final String fileName = "get_by_name.txt";
final String content = "Hello, world";
ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
Assert.assertNotNull(id);
runner.run();
runner.assertTransferCount(FetchGridFS.REL_FAILURE, failure);
runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, original);
runner.assertTransferCount(FetchGridFS.REL_SUCCESS, success);
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.gridfs.GridFSBucket;
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.util.TestRunner;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.io.ByteArrayInputStream;
import java.util.Map;
public class GridFSITTestBase {
static final String URI = "mongodb://localhost:27017";
static final String DB = "gridfs_test_database";
MongoClient client;
public void setup(TestRunner runner, String bucketName) throws Exception {
setup(runner, bucketName, true);
}
public void setup(TestRunner runner, String bucketName, boolean validate) throws Exception {
MongoDBClientService clientService = new MongoDBControllerService();
runner.addControllerService("clientService", clientService);
runner.setProperty(AbstractGridFSProcessor.CLIENT_SERVICE, "clientService");
runner.setProperty(clientService, MongoDBControllerService.URI, URI);
runner.setProperty(AbstractGridFSProcessor.BUCKET_NAME, bucketName);
runner.setProperty(AbstractGridFSProcessor.DATABASE_NAME, DB);
runner.enableControllerService(clientService);
runner.setValidateExpressionUsage(true);
if (validate) {
runner.assertValid();
}
client = new MongoClient("localhost", 27017);
}
public void tearDown() {
client.dropDatabase(DB);
client.close();
}
public boolean fileExists(String name, String bucketName) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = it.hasNext();
it.close();
return retVal;
}
public ObjectId writeTestFile(String fileName, String content, String bucketName, Map<String, Object> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs));
ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes());
ObjectId retVal = bucket.uploadFromStream(fileName, input, options);
return retVal;
}
public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
boolean retVal = false;
if (it.hasNext()) {
GridFSFile file = (GridFSFile)it.next();
Document metadata = file.getMetadata();
if (metadata != null && metadata.size() == attrs.size()) {
retVal = true;
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object val = attrs.get(entry.getKey());
if (val == null || !entry.getValue().equals(val)) {
retVal = false;
break;
}
}
}
}
it.close();
return retVal;
}
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb.gridfs;
import com.mongodb.client.MongoCollection;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class PutGridFSIT extends GridFSITTestBase {
TestRunner runner;
static final String BUCKET = "put_test_bucket";
@Before
public void setup() throws Exception {
runner = TestRunners.newTestRunner(PutGridFS.class);
runner.setProperty(PutGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
super.setup(runner, BUCKET);
}
@After
public void tearDown() {
super.tearDown();
}
@Test
public void testSimplePut() {
final String fileName = "simple_test.txt";
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
runner.enqueue("12345", attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
}
@Test
public void testWithProperties() {
final String fileName = "simple_test_props.txt";
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
attrs.put("prop.created_by", "john.smith");
attrs.put("prop.created_for", "jane.doe");
attrs.put("prop.restrictions", "PHI&PII");
attrs.put("prop.department", "Accounting");
runner.setProperty(PutGridFS.PROPERTIES_PREFIX, "prop");
runner.enqueue("12345", attrs);
runner.run();
runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
attrs = new HashMap<String, String>(){{
put("created_by", "john.smith");
put("created_for", "jane.doe");
put("restrictions", "PHI&PII");
put("department", "Accounting");
}};
Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
Assert.assertTrue("File is missing PARENT_PROPERTIES", fileHasProperties(fileName, BUCKET, attrs));
}
@Test
public void testNoUniqueness() {
String fileName = "test_duplicates.txt";
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
for (int x = 0; x < 10; x++) {
runner.enqueue("Duplicates are ok.", attrs);
runner.run();
}
runner.assertTransferCount(PutGridFS.REL_SUCCESS, 10);
String bucketName = String.format("%s.files", BUCKET);
MongoCollection files = client.getDatabase(DB).getCollection(bucketName);
Document query = Document.parse(String.format("{\"filename\": \"%s\"}", fileName));
long count = files.count(query);
Assert.assertTrue("Wrong count", count == 10);
}
@Test
public void testFileNameUniqueness() {
String fileName = "test_duplicates.txt";
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
testUniqueness(attrs, "Hello, world", PutGridFS.UNIQUE_NAME);
}
@Test
public void testFileNameAndHashUniqueness() {
testHashUniqueness(PutGridFS.UNIQUE_BOTH);
}
@Test
public void testHashUniqueness() {
testHashUniqueness(PutGridFS.UNIQUE_HASH);
}
@Test
public void testChunkSize() {
String[] chunkSizes = new String[] { "128 KB", "256 KB", "384 KB", "512KB", "768KB", "1024 KB" };
StringBuilder sb = new StringBuilder();
for (int x = 0; x < 10000; x++) {
sb.append("This is a test string used to build up a largish text file.");
}
final String testData = sb.toString();
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), "big-putgridfs-test-file.txt");
for (String chunkSize : chunkSizes) {
runner.setProperty(PutGridFS.CHUNK_SIZE, chunkSize);
runner.enqueue(testData, attrs);
runner.run();
runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
runner.clearTransferState();
}
runner.setProperty(PutGridFS.CHUNK_SIZE, "${gridfs.chunk.size}");
attrs.put("gridfs.chunk.size", "768 KB");
runner.enqueue(testData, attrs);
runner.run();
runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
}
private void testHashUniqueness(AllowableValue value) {
String hashAttr = "hash.value";
String fileName = "test_duplicates.txt";
String content = "Hello, world";
String hash = DigestUtils.md5Hex(content);
Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.FILENAME.key(), fileName);
attrs.put(hashAttr, hash);
testUniqueness(attrs, content, value);
}
private void testUniqueness(Map<String, String> attrs, String content, AllowableValue param) {
runner.setProperty(PutGridFS.ENFORCE_UNIQUENESS, param);
for (int x = 0; x < 5; x++) {
runner.enqueue(content, attrs);
runner.run();
}
runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 4);
runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
}
}

View File

@ -40,8 +40,8 @@ import java.util.List;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
"Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
"other Mongo-related components."
"Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
"other Mongo-related components."
)
public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
private String uri;