From 25e0bbb68dfb26ee49216c53a481280c776cce45 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 7 Feb 2018 18:15:35 -0500 Subject: [PATCH] NIFI-4853 - Fixed PutMongoRecord handling of nested records Signed-off-by: Pierre Villard This closes #2457. --- .../nifi-mongodb-processors/pom.xml | 5 + .../processors/mongodb/PutMongoRecord.java | 29 +-- .../mongodb/PutMongoRecordTest.java | 188 ++++++++++++++++++ 3 files changed, 203 insertions(+), 19 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordTest.java diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index 8fbdbfbd7b..2af3cbb814 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -59,6 +59,11 @@ nifi-mock test + + org.apache.nifi + nifi-mock-record-utils + test + org.slf4j slf4j-simple diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java index a5a826c430..59906c732b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -35,14 +34,18 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.bson.Document; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @EventDriven @@ -117,13 +120,16 @@ public class PutMongoRecord extends AbstractMongoProcessor { int added = 0; boolean error = false; - try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { + try (final InputStream inStream = session.read(flowFile); + final RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) { RecordSchema schema = reader.getSchema(); Record record; while ((record = reader.nextRecord()) != null) { + // Convert each Record to HashMap and put into the Mongo document + Map contentMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); Document document = new Document(); for (String name : schema.getFieldNames()) { - document.put(name, record.getValue(name)); + document.put(name, contentMap.get(name)); } inserts.add(document); if (inserts.size() == ceiling) { @@ -141,26 +147,11 @@ public class PutMongoRecord extends AbstractMongoProcessor { error = true; } finally { if (!error) { - session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue(), String.format("Added %d documents to MongoDB.", added)); + session.getProvenanceReporter().send(flowFile, context.getProperty(URI).evaluateAttributeExpressions().getValue(), String.format("Added %d documents to MongoDB.", added)); session.transfer(flowFile, REL_SUCCESS); getLogger().info("Inserted {} records into MongoDB", new Object[]{ added }); } } session.commit(); -/* final ComponentLog logger = getLogger(); - - if (inserts.size() > 0) { - try { - collection.insertMany(inserts); - - session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue()); - session.transfer(flowFile, REL_SUCCESS); - - } catch (Exception e) { - logger.error("Failed to insert {} into MongoDB due to {}", new Object[]{flowFile, e}, e); - session.transfer(flowFile, REL_FAILURE); - context.yield(); - } - }*/ } } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordTest.java new file mode 100644 index 0000000000..a8cbf8263b --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@Ignore("Integration tests that cause failures in some environments") +public class PutMongoRecordTest extends MongoWriteTestBase { + + private MockRecordParser recordReader; + + @Before + public void setup() throws Exception { + super.setup(PutMongoRecord.class); + recordReader = new MockRecordParser(); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); + } + + @After + public void teardown() { + super.teardown(); + } + + private byte[] documentToByteArray(Document doc) { + return doc.toJson().getBytes(StandardCharsets.UTF_8); + } + + @Test + public void testValidators() throws Exception { + TestRunner runner = TestRunners.newTestRunner(PutMongoRecord.class); + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + Collection results; + ProcessContext pc; + + // missing uri, db, collection, RecordReader + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(4, results.size()); + Iterator it = results.iterator(); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo URI is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); + Assert.assertTrue(it.next().toString().contains("is invalid because Record Reader is required")); + + // invalid write concern + runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); + runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME); + runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); + runner.setProperty(PutMongoRecord.RECORD_READER_FACTORY, "reader"); + runner.setProperty(PutMongoRecord.WRITE_CONCERN, "xyz"); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(1, results.size()); + Assert.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*")); + + // valid write concern + runner.setProperty(PutMongoRecord.WRITE_CONCERN, PutMongoRecord.WRITE_CONCERN_UNACKNOWLEDGED); + runner.enqueue(new byte[0]); + pc = runner.getProcessContext(); + results = new HashSet<>(); + if (pc instanceof MockProcessContext) { + results = ((MockProcessContext) pc).validate(); + } + Assert.assertEquals(0, results.size()); + } + + @Test + public void testInsertFlatRecords() throws Exception { + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "Soccer"); + recordReader.addRecord("Jane Doe", 47, "Tennis"); + recordReader.addRecord("Sally Doe", 47, "Curling"); + recordReader.addRecord("Jimmy Doe", 14, null); + recordReader.addRecord("Pizza Doe", 14, null); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongoRecord.REL_SUCCESS).get(0); + + + // verify 1 doc inserted into the collection + assertEquals(5, collection.count()); + //assertEquals(doc, collection.find().first()); + } + + @Test + public void testInsertNestedRecords() throws Exception { + recordReader.addSchemaField("id", RecordFieldType.INT); + final List personFields = new ArrayList<>(); + final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType()); + final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType()); + final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType()); + personFields.add(nameField); + personFields.add(ageField); + personFields.add(sportField); + final RecordSchema personSchema = new SimpleRecordSchema(personFields); + recordReader.addSchemaField("person", RecordFieldType.RECORD); + recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{ + put("name", "John Doe"); + put("age", 48); + put("sport", "Soccer"); + }})); + recordReader.addRecord(2, new MapRecord(personSchema, new HashMap() {{ + put("name", "Jane Doe"); + put("age", 47); + put("sport", "Tennis"); + }})); + recordReader.addRecord(3, new MapRecord(personSchema, new HashMap() {{ + put("name", "Sally Doe"); + put("age", 47); + put("sport", "Curling"); + }})); + recordReader.addRecord(4, new MapRecord(personSchema, new HashMap() {{ + put("name", "Jimmy Doe"); + put("age", 14); + put("sport", null); + }})); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1); + MockFlowFile out = runner.getFlowFilesForRelationship(PutMongoRecord.REL_SUCCESS).get(0); + + + // verify 1 doc inserted into the collection + assertEquals(4, collection.count()); + //assertEquals(doc, collection.find().first()); + } +} \ No newline at end of file