From 67819e5019893fc02022a5d361759b3504276753 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Mon, 26 Jun 2017 12:36:51 -0400 Subject: [PATCH] NIFI-4124 Added org.apache.nifi.mongo.PutMongoRecord. Added changes based on code review. Changed: * Put record reader instantiation inside of try-with. * Put a batch size for the insert List. * Ensured that session.transfer() to the success relationship will always happen. Removed an unused import to fix the style check. This closes #1945. Signed-off-by: Andy LoPresto --- .../nifi-mongodb-processors/pom.xml | 8 + .../mongodb/AbstractMongoProcessor.java | 45 +++++ .../nifi/processors/mongodb/PutMongo.java | 17 +- .../processors/mongodb/PutMongoRecord.java | 166 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + 5 files changed, 221 insertions(+), 16 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index 5361aad8b2..0dfad0714f 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -46,6 +46,14 @@ commons-io commons-io + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 6f165c28cf..10f1b61103 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import javax.net.ssl.SSLContext; +import com.mongodb.WriteConcern; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -45,6 +46,13 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; public abstract class AbstractMongoProcessor extends AbstractProcessor { + static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; + static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; + static final String WRITE_CONCERN_FSYNCED = "FSYNCED"; + static final String WRITE_CONCERN_JOURNALED = "JOURNALED"; + static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; + static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() .name("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") @@ -85,6 +93,15 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .defaultValue("REQUIRED") .build(); + public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() + .name("Write Concern") + .description("The write concern to use") + .required(true) + .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, + WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) + .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) + .build(); + static List descriptors = new ArrayList<>(); static { @@ -176,4 +193,32 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { protected String getURI(final ProcessContext context) { return context.getProperty(URI).evaluateAttributeExpressions().getValue(); } + + protected WriteConcern getWriteConcern(final ProcessContext context) { + final String writeConcernProperty = context.getProperty(WRITE_CONCERN).getValue(); + WriteConcern writeConcern = null; + switch (writeConcernProperty) { + case WRITE_CONCERN_ACKNOWLEDGED: + writeConcern = WriteConcern.ACKNOWLEDGED; + break; + case WRITE_CONCERN_UNACKNOWLEDGED: + writeConcern = WriteConcern.UNACKNOWLEDGED; + break; + case WRITE_CONCERN_FSYNCED: + writeConcern = WriteConcern.FSYNCED; + break; + case WRITE_CONCERN_JOURNALED: + writeConcern = WriteConcern.JOURNALED; + break; + case WRITE_CONCERN_REPLICA_ACKNOWLEDGED: + writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED; + break; + case WRITE_CONCERN_MAJORITY: + writeConcern = WriteConcern.MAJORITY; + break; + default: + writeConcern = WriteConcern.ACKNOWLEDGED; + } + return writeConcern; + } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index 5b5ad523d4..cd4263594b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -59,13 +59,6 @@ public class PutMongo extends AbstractMongoProcessor { static final String MODE_INSERT = "insert"; static final String MODE_UPDATE = "update"; - static final String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED"; - static final String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED"; - static final String WRITE_CONCERN_FSYNCED = "FSYNCED"; - static final String WRITE_CONCERN_JOURNALED = "JOURNALED"; - static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; - static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; - static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() .name("Mode") .description("Indicates whether the processor should insert or update content") @@ -90,14 +83,6 @@ public class PutMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("_id") .build(); - static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder() - .name("Write Concern") - .description("The write concern to use") - .required(true) - .allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED, - WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY) - .defaultValue(WRITE_CONCERN_ACKNOWLEDGED) - .build(); static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() .name("Character Set") .description("The Character Set in which the data is encoded") @@ -131,7 +116,7 @@ public class PutMongo extends AbstractMongoProcessor { } @Override - public final List getSupportedPropertyDescriptors() { + public List getSupportedPropertyDescriptors() { return propertyDescriptors; } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java new file mode 100644 index 0000000000..116c575fe3 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -0,0 +1,166 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoCollection; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.bson.Document; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@Tags({"mongodb", "insert", "record", "put"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Bulk ingest documents into MonogDB using a configured record reader.") +public class PutMongoRecord extends AbstractMongoProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor INSERT_COUNT = new PropertyDescriptor.Builder() + .name("insert_count") + .displayName("Insert Batch Size") + .description("The number of records to group together for one single insert operation against MongoDB.") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private final static Set relationships; + private final static List propertyDescriptors; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(WRITE_CONCERN); + _propertyDescriptors.add(RECORD_READER_FACTORY); + _propertyDescriptors.add(INSERT_COUNT); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RecordReaderFactory.class); + + final WriteConcern writeConcern = getWriteConcern(context); + + final MongoCollection collection = getCollection(context).withWriteConcern(writeConcern); + + List inserts = new ArrayList<>(); + int ceiling = context.getProperty(INSERT_COUNT).asInteger(); + int added = 0; + boolean error = false; + + try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + RecordSchema schema = reader.getSchema(); + Record record; + while ((record = reader.nextRecord()) != null) { + Document document = new Document(); + for (String name : schema.getFieldNames()) { + document.put(name, record.getValue(name)); + } + inserts.add(document); + if (inserts.size() == ceiling) { + collection.insertMany(inserts); + added += inserts.size(); + inserts = new ArrayList<>(); + } + } + if (inserts.size() > 0) { + collection.insertMany(inserts); + } + } catch (SchemaNotFoundException | IOException | MalformedRecordException e) { + getLogger().error("PutMongoRecord failed with error:", e); + session.transfer(flowFile, REL_FAILURE); + error = true; + } finally { + if (!error) { + session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue(), String.format("Added %d documents to MongoDB.", added)); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Inserted {} records into MongoDB", new Object[]{ added }); + } + } + session.commit(); +/* final ComponentLog logger = getLogger(); + + if (inserts.size() > 0) { + try { + collection.insertMany(inserts); + + session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue()); + session.transfer(flowFile, REL_SUCCESS); + + } catch (Exception e) { + logger.error("Failed to insert {} into MongoDB due to {}", new Object[]{flowFile, e}, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + }*/ + } +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 92e1cf71d2..8a17ad8e99 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.mongodb.GetMongo org.apache.nifi.processors.mongodb.PutMongo +org.apache.nifi.processors.mongodb.PutMongoRecord \ No newline at end of file