mirror of https://github.com/apache/nifi.git
NIFI-10949 Updated MongoDB components using current driver (#6759)
- Replaced references to mongodb-driver-legacy with mongodb-driver-sync - Updated components to use current synchronous driver classes - Upgraded MongoDB driver from 4.7.2 to 4.8.0
This commit is contained in:
parent
e97977d61f
commit
6c517446a4
|
@ -38,18 +38,12 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-legacy</artifactId>
|
||||
<version>${mongo.driver.version}</version>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-json-utils</artifactId>
|
||||
<version>1.20.0-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -28,8 +28,8 @@
|
|||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-legacy</artifactId>
|
||||
<version>${mongo.driver.version}</version>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -19,15 +19,14 @@
|
|||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientOptions.Builder;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
@ -230,7 +229,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
|
|||
protected MongoDBClientService clientService;
|
||||
|
||||
@OnScheduled
|
||||
public final void createClient(ProcessContext context) throws IOException {
|
||||
public final void createClient(ProcessContext context) {
|
||||
if (context.getProperty(CLIENT_SERVICE).isSet()) {
|
||||
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
|
||||
return;
|
||||
|
@ -253,21 +252,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
if(sslContext == null) {
|
||||
mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
|
||||
} else {
|
||||
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
|
||||
}
|
||||
final String uri = getURI(context);
|
||||
final MongoClientSettings.Builder builder = getClientSettings(uri, sslContext);
|
||||
final MongoClientSettings clientSettings = builder.build();
|
||||
mongoClient = MongoClients.create(clientSettings);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected Builder getClientOptions(final SSLContext sslContext) {
|
||||
MongoClientOptions.Builder builder = MongoClientOptions.builder();
|
||||
builder.sslEnabled(true);
|
||||
builder.sslContext(sslContext);
|
||||
protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) {
|
||||
final MongoClientSettings.Builder builder = MongoClientSettings.builder();
|
||||
builder.applyConnectionString(new ConnectionString(uri));
|
||||
if (sslContext != null) {
|
||||
builder.applyToSslSettings(sslBuilder ->
|
||||
sslBuilder.enabled(true).context(sslContext)
|
||||
);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,182 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.mongodb
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes
|
||||
import org.apache.nifi.json.JsonRecordSetWriter
|
||||
import org.apache.nifi.mongodb.MongoDBClientService
|
||||
import org.apache.nifi.mongodb.MongoDBControllerService
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils
|
||||
import org.apache.nifi.serialization.DateTimeUtils
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema
|
||||
import org.apache.nifi.serialization.record.*
|
||||
import org.apache.nifi.util.TestRunner
|
||||
import org.apache.nifi.util.TestRunners
|
||||
import org.bson.Document
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.Assertions
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.testcontainers.containers.MongoDBContainer
|
||||
import org.testcontainers.junit.jupiter.Container
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
import org.testcontainers.utility.DockerImageName
|
||||
|
||||
import static groovy.json.JsonOutput.*
|
||||
|
||||
class GetMongoRecordIT extends AbstractMongoIT {
|
||||
TestRunner runner
|
||||
MongoDBClientService service
|
||||
|
||||
static RecordSchema SCHEMA
|
||||
static final String DB_NAME = GetMongoRecord.class.simpleName + Calendar.instance.timeInMillis
|
||||
static final String COL_NAME = "test"
|
||||
|
||||
static {
|
||||
def fields = [
|
||||
new RecordField("name", RecordFieldType.STRING.dataType),
|
||||
new RecordField("failedLogins", RecordFieldType.INT.dataType),
|
||||
new RecordField("lastLogin", RecordFieldType.DATE.dataType)
|
||||
]
|
||||
SCHEMA = new SimpleRecordSchema(fields, new StandardSchemaIdentifier.Builder().name("sample").build())
|
||||
}
|
||||
|
||||
static final List<Map> SAMPLES = [
|
||||
[ name: "John Smith", failedLogins: 2, lastLogin: Calendar.instance.time ],
|
||||
[ name: "Jane Doe", failedLogins: 1, lastLogin: new Date(Calendar.instance.time.time - 360000) ],
|
||||
[ name: "John Brown", failedLogins: 4, lastLogin: new Date(Calendar.instance.time.time - 10000) ]
|
||||
].collect { new Document(it) }
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
runner = TestRunners.newTestRunner(GetMongoRecord.class)
|
||||
service = new MongoDBControllerService()
|
||||
runner.addControllerService("client", service)
|
||||
runner.setProperty(service, MongoDBControllerService.URI, MONGO_CONTAINER.getConnectionString())
|
||||
runner.enableControllerService(service)
|
||||
|
||||
def writer = new JsonRecordSetWriter()
|
||||
def registry = new MockSchemaRegistry()
|
||||
registry.addSchema("sample", SCHEMA)
|
||||
|
||||
runner.addControllerService("writer", writer)
|
||||
runner.addControllerService("registry", registry)
|
||||
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
|
||||
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
|
||||
runner.setProperty(writer, DateTimeUtils.DATE_FORMAT, "yyyy")
|
||||
runner.enableControllerService(registry)
|
||||
runner.enableControllerService(writer)
|
||||
|
||||
runner.setProperty(GetMongoRecord.DATABASE_NAME, DB_NAME)
|
||||
runner.setProperty(GetMongoRecord.COLLECTION_NAME, COL_NAME)
|
||||
runner.setProperty(GetMongoRecord.CLIENT_SERVICE, "client")
|
||||
runner.setProperty(GetMongoRecord.WRITER_FACTORY, "writer")
|
||||
|
||||
service.getDatabase(DB_NAME).getCollection(COL_NAME).insertMany(SAMPLES)
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void after() {
|
||||
service.getDatabase(DB_NAME).drop()
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLookup() {
|
||||
def ffValidator = { TestRunner runner ->
|
||||
def ffs = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)
|
||||
Assertions.assertNotNull(ffs)
|
||||
Assertions.assertTrue(ffs.size() == 1)
|
||||
Assertions.assertEquals("3", ffs[0].getAttribute("record.count"))
|
||||
Assertions.assertEquals("application/json", ffs[0].getAttribute(CoreAttributes.MIME_TYPE.key()))
|
||||
Assertions.assertEquals(COL_NAME, ffs[0].getAttribute(GetMongoRecord.COL_NAME))
|
||||
Assertions.assertEquals(DB_NAME, ffs[0].getAttribute(GetMongoRecord.DB_NAME))
|
||||
Assertions.assertEquals(Document.parse("{}"), Document.parse(ffs[0].getAttribute("executed.query")))
|
||||
}
|
||||
|
||||
runner.setProperty(GetMongoRecord.QUERY_ATTRIBUTE, "executed.query")
|
||||
runner.setProperty(GetMongoRecord.QUERY, "{}")
|
||||
runner.enqueue("", [ "schema.name": "sample"])
|
||||
runner.run()
|
||||
|
||||
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
|
||||
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
|
||||
runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
|
||||
|
||||
ffValidator(runner)
|
||||
|
||||
runner.clearTransferState()
|
||||
runner.removeProperty(GetMongoRecord.QUERY)
|
||||
runner.enqueue("{}", [ "schema.name": "sample"])
|
||||
runner.run()
|
||||
|
||||
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
|
||||
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
|
||||
runner.assertTransferCount(GetMongoRecord.REL_ORIGINAL, 1)
|
||||
|
||||
ffValidator(runner)
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSortAndProjection() {
|
||||
runner.setIncomingConnection(false)
|
||||
runner.setVariable("schema.name", "sample")
|
||||
runner.setProperty(GetMongoRecord.SORT, toJson([failedLogins: 1]))
|
||||
runner.setProperty(GetMongoRecord.PROJECTION, toJson([failedLogins: 1]))
|
||||
runner.setProperty(GetMongoRecord.QUERY, "{}")
|
||||
runner.run()
|
||||
|
||||
def parsed = sharedTest()
|
||||
Assertions.assertEquals(3, parsed.size())
|
||||
def values = [1, 2, 4]
|
||||
int index = 0
|
||||
parsed.each {
|
||||
Assertions.assertEquals(values[index++], it["failedLogins"])
|
||||
Assertions.assertNull(it["name"])
|
||||
Assertions.assertNull(it["lastLogin"])
|
||||
}
|
||||
}
|
||||
|
||||
List<Map<String, Object>> sharedTest() {
|
||||
runner.assertTransferCount(GetMongoRecord.REL_FAILURE, 0)
|
||||
runner.assertTransferCount(GetMongoRecord.REL_SUCCESS, 1)
|
||||
|
||||
def ff = runner.getFlowFilesForRelationship(GetMongoRecord.REL_SUCCESS)[0]
|
||||
def raw = runner.getContentAsByteArray(ff)
|
||||
String content = new String(raw)
|
||||
def parsed = new JsonSlurper().parseText(content)
|
||||
Assertions.assertNotNull(parsed)
|
||||
|
||||
parsed
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLimit() {
|
||||
runner.setIncomingConnection(false)
|
||||
runner.setProperty(GetMongoRecord.LIMIT, "1")
|
||||
runner.setProperty(GetMongoRecord.QUERY, "{}")
|
||||
runner.setVariable("schema.name", "sample")
|
||||
runner.run()
|
||||
|
||||
def parsed = sharedTest()
|
||||
Assertions.assertEquals(1, parsed.size())
|
||||
|
||||
}
|
||||
}
|
|
@ -16,8 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientOptions.Builder;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
|
@ -32,7 +31,7 @@ import javax.net.ssl.SSLContext;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -42,7 +41,7 @@ public class AbstractMongoProcessorTest {
|
|||
private TestRunner testRunner;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
public void setUp() {
|
||||
processor = new MockAbstractMongoProcessor();
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
}
|
||||
|
@ -80,8 +79,8 @@ public class AbstractMongoProcessorTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Builder getClientOptions(SSLContext sslContext) {
|
||||
return MongoClientOptions.builder();
|
||||
protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) {
|
||||
return MongoClientSettings.builder();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -86,7 +86,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
|
||||
runner.setIncomingConnection(false);
|
||||
|
||||
mongoClient = new MongoClient(new MongoClientURI(MONGO_CONTAINER.getConnectionString()));
|
||||
mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
|
||||
|
||||
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
|
||||
collection.insertMany(DOCUMENTS);
|
||||
|
@ -186,7 +186,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReadOneDocument() throws Exception {
|
||||
public void testReadOneDocument() {
|
||||
runner.setVariable("query", "{a: 1, b: 3}");
|
||||
runner.setProperty(GetMongo.QUERY, "${query}");
|
||||
runner.run();
|
||||
|
@ -197,7 +197,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReadMultipleDocuments() throws Exception {
|
||||
public void testReadMultipleDocuments() {
|
||||
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
|
||||
|
@ -209,7 +209,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testProjection() throws Exception {
|
||||
public void testProjection() {
|
||||
runner.setProperty(GetMongo.QUERY, "{\"a\": 1, \"b\": 3}");
|
||||
runner.setProperty(GetMongo.PROJECTION, "{\"_id\": 0, \"a\": 1}");
|
||||
runner.run();
|
||||
|
@ -221,7 +221,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSort() throws Exception {
|
||||
public void testSort() {
|
||||
runner.setVariable("sort", "{a: -1, b: -1, c: 1}");
|
||||
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
|
||||
runner.setProperty(GetMongo.SORT, "${sort}");
|
||||
|
@ -235,7 +235,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLimit() throws Exception {
|
||||
public void testLimit() {
|
||||
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
|
||||
runner.setProperty(GetMongo.LIMIT, "${limit}");
|
||||
runner.setVariable("limit", "1");
|
||||
|
@ -247,7 +247,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testResultsPerFlowfile() throws Exception {
|
||||
public void testResultsPerFlowfile() {
|
||||
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}");
|
||||
runner.setVariable("results.per.flowfile", "2");
|
||||
runner.enqueue("{}");
|
||||
|
@ -261,7 +261,7 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testBatchSize() throws Exception {
|
||||
public void testBatchSize() {
|
||||
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
|
||||
runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}");
|
||||
runner.setVariable("batch.size", "1");
|
||||
|
@ -656,21 +656,4 @@ public class GetMongoIT extends AbstractMongoIT {
|
|||
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
|
||||
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
|
||||
}
|
||||
|
||||
public void testSendEmpty() throws Exception {
|
||||
runner.setIncomingConnection(true);
|
||||
runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true");
|
||||
runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }");
|
||||
runner.assertValid();
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
|
||||
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
|
||||
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
|
||||
|
||||
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
|
||||
MockFlowFile flowFile = flowFiles.get(0);
|
||||
assertEquals(0, flowFile.getSize());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -44,7 +44,7 @@ public class MongoWriteTestBase extends AbstractMongoIT {
|
|||
|
||||
public void setup(Class processor) {
|
||||
DATABASE_NAME = processor.getSimpleName().toLowerCase();
|
||||
mongoClient = new MongoClient(new MongoClientURI(MONGO_CONTAINER.getConnectionString()));
|
||||
mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
|
||||
collection = mongoClient.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.junit.jupiter.api.AfterEach;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -81,10 +80,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
return runner;
|
||||
}
|
||||
|
||||
private byte[] documentToByteArray(Document doc) {
|
||||
return doc.toJson().getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidators() throws Exception {
|
||||
TestRunner runner = TestRunners.newTestRunner(PutMongoRecord.class);
|
||||
|
@ -213,12 +208,8 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
|
||||
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongoRecord.REL_SUCCESS).get(0);
|
||||
|
||||
|
||||
// verify 1 doc inserted into the collection
|
||||
assertEquals(4, collection.countDocuments());
|
||||
//assertEquals(doc, collection.find().first());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -250,7 +241,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
|
||||
@Test
|
||||
void testUpsertAsInsert() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner runner = init();
|
||||
|
||||
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
|
||||
|
@ -293,14 +283,11 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}}
|
||||
));
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testUpsertSuccess(runner, inputs, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpsertAsUpdate() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner runner = init();
|
||||
|
||||
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
|
||||
|
@ -353,13 +340,11 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}}
|
||||
));
|
||||
|
||||
// WHEN
|
||||
testUpsertSuccess(runner, inputs, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpsertAsInsertAndUpdate() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner runner = init();
|
||||
|
||||
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
|
||||
|
@ -408,13 +393,11 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}}
|
||||
));
|
||||
|
||||
// WHEN
|
||||
testUpsertSuccess(runner, inputs, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRouteToFailureWhenKeyFieldDoesNotExist() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner runner = init();
|
||||
|
||||
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,non_existent_field");
|
||||
|
@ -436,17 +419,13 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
)
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testUpsertFailure(runner, inputs);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateMany() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner initRunner = init();
|
||||
|
||||
// Init Mongo data
|
||||
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||
recordReader.addSchemaField("team", RecordFieldType.STRING);
|
||||
recordReader.addSchemaField("color", RecordFieldType.STRING);
|
||||
|
@ -503,17 +482,13 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}}
|
||||
));
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testUpsertSuccess(updateRunner, inputs, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateModeFFAttributeSetToMany() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner initRunner = init();
|
||||
|
||||
// Init Mongo data
|
||||
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||
recordReader.addSchemaField("team", RecordFieldType.STRING);
|
||||
recordReader.addSchemaField("color", RecordFieldType.STRING);
|
||||
|
@ -570,7 +545,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}}
|
||||
));
|
||||
|
||||
// WHEN
|
||||
inputs.forEach(input -> {
|
||||
input.forEach(recordReader::addRecord);
|
||||
|
||||
|
@ -582,7 +556,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
updateRunner.run();
|
||||
});
|
||||
|
||||
// THEN
|
||||
assertEquals(0, updateRunner.getQueueSize().getObjectCount());
|
||||
|
||||
updateRunner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size());
|
||||
|
@ -614,14 +587,11 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
)
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testUpsertFailure(runner, inputs);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRouteToFailureWhenKeyFieldReferencesNonEmbeddedDocument() throws Exception {
|
||||
// GIVEN
|
||||
TestRunner runner = init();
|
||||
|
||||
runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id,id.is_not_an_embedded_document");
|
||||
|
@ -643,15 +613,10 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
)
|
||||
);
|
||||
|
||||
// WHEN
|
||||
// THEN
|
||||
testUpsertFailure(runner, inputs);
|
||||
}
|
||||
|
||||
private void testUpsertSuccess(TestRunner runner, List<List<Object[]>> inputs, Set<Map<String, Object>> expected) {
|
||||
// GIVEN
|
||||
|
||||
// WHEN
|
||||
inputs.forEach(input -> {
|
||||
input.forEach(recordReader::addRecord);
|
||||
|
||||
|
@ -659,7 +624,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
runner.run();
|
||||
});
|
||||
|
||||
// THEN
|
||||
assertEquals(0, runner.getQueueSize().getObjectCount());
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, inputs.size());
|
||||
|
@ -675,10 +639,8 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
}
|
||||
|
||||
private void testUpsertFailure(TestRunner runner, List<List<Object[]>> inputs) {
|
||||
// GIVEN
|
||||
Set<Object> expected = Collections.emptySet();
|
||||
|
||||
// WHEN
|
||||
inputs.forEach(input -> {
|
||||
input.forEach(recordReader::addRecord);
|
||||
|
||||
|
@ -686,7 +648,6 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
|
|||
runner.run();
|
||||
});
|
||||
|
||||
// THEN
|
||||
assertEquals(0, runner.getQueueSize().getObjectCount());
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_FAILURE, inputs.size());
|
||||
|
|
|
@ -20,8 +20,8 @@
|
|||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import org.apache.nifi.mongodb.MongoDBClientService;
|
||||
import org.apache.nifi.mongodb.MongoDBControllerService;
|
||||
|
@ -67,7 +67,7 @@ public class RunMongoAggregationIT extends AbstractMongoIT {
|
|||
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
|
||||
runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR);
|
||||
|
||||
mongoClient = new MongoClient(new MongoClientURI(MONGO_CONTAINER.getConnectionString()));
|
||||
mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
|
||||
|
||||
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
|
||||
String[] values = new String[] { "a", "b", "c" };
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
|
||||
package org.apache.nifi.processors.mongodb.gridfs;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoCursor;
|
||||
import com.mongodb.client.gridfs.GridFSBucket;
|
||||
import com.mongodb.client.gridfs.GridFSBuckets;
|
||||
|
@ -56,10 +57,10 @@ public class GridFSITTestBase extends AbstractMongoIT {
|
|||
runner.assertValid();
|
||||
}
|
||||
|
||||
client = new MongoClient(MONGO_CONTAINER.getConnectionString());
|
||||
client = MongoClients.create(MONGO_CONTAINER.getConnectionString());
|
||||
}
|
||||
public void tearDown() {
|
||||
client.dropDatabase(DB);
|
||||
client.getDatabase(DB).drop();
|
||||
client.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-legacy</artifactId>
|
||||
<version>${mongo.driver.version}</version>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientOptions;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -88,21 +89,24 @@ public class MongoDBControllerService extends AbstractControllerService implemen
|
|||
}
|
||||
|
||||
try {
|
||||
if(sslContext == null) {
|
||||
return new MongoClient(new MongoClientURI(getURI(context)));
|
||||
} else {
|
||||
return new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
|
||||
}
|
||||
final String uri = getURI(context);
|
||||
final MongoClientSettings.Builder builder = getClientSettings(uri, sslContext);
|
||||
final MongoClientSettings clientSettings = builder.build();
|
||||
return MongoClients.create(clientSettings);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) {
|
||||
MongoClientOptions.Builder builder = MongoClientOptions.builder();
|
||||
builder.sslEnabled(true);
|
||||
builder.sslContext(sslContext);
|
||||
protected MongoClientSettings.Builder getClientSettings(final String uri, final SSLContext sslContext) {
|
||||
final MongoClientSettings.Builder builder = MongoClientSettings.builder();
|
||||
builder.applyConnectionString(new ConnectionString(uri));
|
||||
if (sslContext != null) {
|
||||
builder.applyToSslSettings(sslBuilder ->
|
||||
sslBuilder.enabled(true).context(sslContext)
|
||||
);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.mongodb;
|
||||
|
||||
import com.mongodb.MongoClientURI;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
|
||||
public class Validation {
|
||||
public static final Validator DOCUMENT_VALIDATOR = new Validator() {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String value, ValidationContext context) {
|
||||
final ValidationResult.Builder builder = new ValidationResult.Builder();
|
||||
builder.subject(subject).input(value);
|
||||
|
||||
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
|
||||
return builder.valid(true).explanation("Contains Expression Language").build();
|
||||
}
|
||||
|
||||
String reason = null;
|
||||
try {
|
||||
new MongoClientURI(value);
|
||||
} catch (final Exception e) {
|
||||
reason = e.getLocalizedMessage();
|
||||
}
|
||||
|
||||
return builder.explanation(reason).valid(reason == null).build();
|
||||
}
|
||||
};
|
||||
}
|
|
@ -52,12 +52,12 @@ public class MongoDBControllerServiceIT extends AbstractMongoIT {
|
|||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() throws Exception {
|
||||
public void after() {
|
||||
service.onDisable();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit() throws Exception {
|
||||
public void testInit() {
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,11 +35,16 @@
|
|||
</modules>
|
||||
|
||||
<properties>
|
||||
<mongo.driver.version>4.7.2</mongo.driver.version>
|
||||
<mongo.driver.version>4.8.0</mongo.driver.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>mongodb-driver-sync</artifactId>
|
||||
<version>${mongo.driver.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mongodb-processors</artifactId>
|
||||
|
|
Loading…
Reference in New Issue