NIFI-8208: Upgrade MongoDB driver to 4.3 series

- Add AllowableValue objects for write concern values
- Remove warnings for using deprecated write concerns in PutMongo

This closes #5392

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lance Kinley 2021-09-16 08:59:09 -07:00 committed by exceptionfactory
parent 67ccdf6159
commit 30efcd35e4
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
14 changed files with 108 additions and 39 deletions

View File

@ -38,7 +38,7 @@
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-legacy</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<dependency>

View File

@ -19,6 +19,7 @@ package org.apache.nifi.mongodb;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
@ -35,6 +36,39 @@ public interface MongoDBClientService extends ControllerService {
String WRITE_CONCERN_JOURNALED = "JOURNALED";
String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
String WRITE_CONCERN_MAJORITY = "MAJORITY";
String WRITE_CONCERN_W1 = "W1";
String WRITE_CONCERN_W2 = "W2";
String WRITE_CONCERN_W3 = "W3";
static final AllowableValue WRITE_CONCERN_ACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_ACKNOWLEDGED,
"Write operations that use this write concern will wait for acknowledgement, " +
"using the default write concern configured on the server");
static final AllowableValue WRITE_CONCERN_UNACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED,
"Write operations that use this write concern will return as soon as the message is written to the socket. " +
"Exceptions are raised for network issues, but not server errors");
static final AllowableValue WRITE_CONCERN_FSYNCED_VALUE = new AllowableValue(
WRITE_CONCERN_FSYNCED, WRITE_CONCERN_FSYNCED,
"Deprecated. Use of \"" + WRITE_CONCERN_JOURNALED + "\" is preferred");
static final AllowableValue WRITE_CONCERN_JOURNALED_VALUE = new AllowableValue(
WRITE_CONCERN_JOURNALED, WRITE_CONCERN_JOURNALED,
"Write operations wait for the server to group commit to the journal file on disk");
static final AllowableValue WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_REPLICA_ACKNOWLEDGED,
"Deprecated. Use of \"" + WRITE_CONCERN_W2 + "\" is preferred");
static final AllowableValue WRITE_CONCERN_MAJORITY_VALUE = new AllowableValue(
WRITE_CONCERN_MAJORITY, WRITE_CONCERN_MAJORITY,
"Exceptions are raised for network issues, and server errors; waits on a majority of servers for the write operation");
static final AllowableValue WRITE_CONCERN_W1_VALUE = new AllowableValue(
WRITE_CONCERN_W1, WRITE_CONCERN_W1,
"Write operations that use this write concern will wait for acknowledgement from a single member");
static final AllowableValue WRITE_CONCERN_W2_VALUE = new AllowableValue(
WRITE_CONCERN_W2, WRITE_CONCERN_W2,
"Write operations that use this write concern will wait for acknowledgement from two members");
static final AllowableValue WRITE_CONCERN_W3_VALUE = new AllowableValue(
WRITE_CONCERN_W3, WRITE_CONCERN_W3,
"Write operations that use this write concern will wait for acknowledgement from three members");
PropertyDescriptor URI = new PropertyDescriptor.Builder()
.name("mongo-uri")
@ -85,8 +119,9 @@ public interface MongoDBClientService extends ControllerService {
.displayName("Write Concern")
.description("The write concern to use")
.required(true)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED_VALUE, WRITE_CONCERN_UNACKNOWLEDGED_VALUE, WRITE_CONCERN_FSYNCED_VALUE,
WRITE_CONCERN_JOURNALED_VALUE, WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE, WRITE_CONCERN_MAJORITY_VALUE,
WRITE_CONCERN_W1_VALUE, WRITE_CONCERN_W2_VALUE, WRITE_CONCERN_W3_VALUE)
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
.build();

View File

@ -28,7 +28,7 @@
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-legacy</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<dependency>

View File

@ -65,6 +65,9 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
static final String WRITE_CONCERN_JOURNALED = "JOURNALED";
static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED";
static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
static final String WRITE_CONCERN_W1 = "W1";
static final String WRITE_CONCERN_W2 = "W2";
static final String WRITE_CONCERN_W3 = "W3";
protected static final String JSON_TYPE_EXTENDED = "Extended";
protected static final String JSON_TYPE_STANDARD = "Standard";
@ -145,7 +148,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.description("The write concern to use")
.required(true)
.allowableValues(WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_FSYNCED, WRITE_CONCERN_JOURNALED,
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY)
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_MAJORITY, WRITE_CONCERN_W1, WRITE_CONCERN_W2, WRITE_CONCERN_W3)
.defaultValue(WRITE_CONCERN_ACKNOWLEDGED)
.build();
@ -264,7 +267,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
protected Builder getClientOptions(final SSLContext sslContext) {
MongoClientOptions.Builder builder = MongoClientOptions.builder();
builder.sslEnabled(true);
builder.socketFactory(sslContext.getSocketFactory());
builder.sslContext(sslContext);
return builder;
}
@ -310,17 +313,28 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.FSYNCED;
writeConcern = WriteConcern.JOURNALED;
getLogger().warn("Using deprecated write concern FSYNCED");
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
writeConcern = WriteConcern.W2;
getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED");
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
case WRITE_CONCERN_W1:
writeConcern = WriteConcern.W1;
break;
case WRITE_CONCERN_W2:
writeConcern = WriteConcern.W2;
break;
case WRITE_CONCERN_W3:
writeConcern = WriteConcern.W3;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}

View File

@ -141,7 +141,7 @@ public class GetMongo extends AbstractMongoQueryProcessor {
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
asJson = getObjectWriter(objectMapper, prettyPrintSetting).writeValueAsString(document);
} else {
asJson = document.toJson(new JsonWriterSettings(true));
asJson = document.toJson(JsonWriterSettings.builder().indent(true).build());
}
builder
.append(asJson)

View File

@ -20,7 +20,7 @@ import com.mongodb.BasicDBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.util.JSON;
import com.mongodb.client.model.ReplaceOptions;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -206,7 +206,7 @@ public class PutMongo extends AbstractMongoProcessor {
// parse
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) : JSON.parse(new String(content, charset));
? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset));
if (MODE_INSERT.equalsIgnoreCase(mode)) {
collection.insertOne((Document)doc);
@ -226,7 +226,7 @@ public class PutMongo extends AbstractMongoProcessor {
}
if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new UpdateOptions().upsert(upsert));
collection.replaceOne(query, (Document)doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove(updateKey);
@ -288,17 +288,26 @@ public class PutMongo extends AbstractMongoProcessor {
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.FSYNCED;
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
writeConcern = WriteConcern.W2;
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
case WRITE_CONCERN_W1:
writeConcern = WriteConcern.W1;
break;
case WRITE_CONCERN_W2:
writeConcern = WriteConcern.W2;
break;
case WRITE_CONCERN_W3:
writeConcern = WriteConcern.W3;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}

View File

@ -219,7 +219,7 @@ public class PutGridFS extends AbstractGridFSProcessor {
query = new Document().append("filename", fileName);
}
retVal = getDatabase(input, context).getCollection(fileColl).count(query) == 0;
retVal = getDatabase(input, context).getCollection(fileColl).countDocuments(query) == 0;
}
return retVal;

View File

@ -49,7 +49,7 @@ public class DeleteMongoIT extends MongoWriteTestBase {
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assertions.assertEquals(0, collection.count(Document.parse(query)),
Assertions.assertEquals(0, collection.countDocuments(Document.parse(query)),
"Found a document that should have been deleted.");
}
@ -73,8 +73,8 @@ public class DeleteMongoIT extends MongoWriteTestBase {
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assertions.assertEquals(0, collection.count(Document.parse(query)), "Found a document that should have been deleted.");
Assertions.assertEquals(1, collection.count(Document.parse("{}")), "One document should have been left.");
Assertions.assertEquals(0, collection.countDocuments(Document.parse(query)), "Found a document that should have been deleted.");
Assertions.assertEquals(1, collection.countDocuments(Document.parse("{}")), "One document should have been left.");
}
@Test
@ -106,7 +106,7 @@ public class DeleteMongoIT extends MongoWriteTestBase {
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 1);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 0);
Assertions.assertEquals(3, collection.count(Document.parse("{}")), "A document was deleted");
Assertions.assertEquals(3, collection.countDocuments(Document.parse("{}")), "A document was deleted");
runner.setProperty(DeleteMongo.FAIL_ON_NO_DELETE, DeleteMongo.NO_FAIL);
runner.clearTransferState();
@ -117,7 +117,7 @@ public class DeleteMongoIT extends MongoWriteTestBase {
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
Assertions.assertEquals(3, collection.count(Document.parse("{}")), "A document was deleted");
Assertions.assertEquals(3, collection.countDocuments(Document.parse("{}")), "A document was deleted");
}
@Test
@ -137,6 +137,6 @@ public class DeleteMongoIT extends MongoWriteTestBase {
runner.assertTransferCount(DeleteMongo.REL_SUCCESS, 1);
runner.assertTransferCount(DeleteMongo.REL_FAILURE, 0);
Assertions.assertEquals(0, collection.count());
Assertions.assertEquals(0, collection.countDocuments());
}
}

View File

@ -225,7 +225,7 @@ public class PutMongoIT extends MongoWriteTestBase {
assertEquals(contacts.get("twitter"), "@JohnSmith");
assertEquals(contacts.get("email"), "john.smith@test.com");
assertEquals(contacts.get("phone"), "555-555-5555");
assertEquals(collection.count(document), 1);
assertEquals(collection.countDocuments(document), 1);
}
@Test
@ -293,7 +293,7 @@ public class PutMongoIT extends MongoWriteTestBase {
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
}
@ -313,7 +313,7 @@ public class PutMongoIT extends MongoWriteTestBase {
}
// verify 3 docs inserted into the collection
assertEquals(3, collection.count());
assertEquals(3, collection.countDocuments());
}
@Test
@ -340,7 +340,7 @@ public class PutMongoIT extends MongoWriteTestBase {
}
// verify 2 docs inserted into the collection for a total of 3
assertEquals(3, collection.count());
assertEquals(3, collection.countDocuments());
}
/**
@ -363,7 +363,7 @@ public class PutMongoIT extends MongoWriteTestBase {
out.assertContentEquals(bytes);
// nothing was in collection, so nothing to update since upsert defaults to false
assertEquals(0, collection.count());
assertEquals(0, collection.countDocuments());
}
/**
@ -387,7 +387,7 @@ public class PutMongoIT extends MongoWriteTestBase {
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
}
@ -407,7 +407,7 @@ public class PutMongoIT extends MongoWriteTestBase {
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(1, collection.countDocuments());
assertEquals(oidDocument, collection.find().first());
}
@ -435,7 +435,7 @@ public class PutMongoIT extends MongoWriteTestBase {
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
assertEquals(1, collection.count());
assertEquals(1, collection.countDocuments());
assertEquals(doc, collection.find().first());
}
@ -518,7 +518,7 @@ public class PutMongoIT extends MongoWriteTestBase {
Document query = new Document(updateKeyProps[index], updateKeys[index]);
Document result = collection.find(query).first();
Assertions.assertNotNull(result, "Result was null");
assertEquals(1, collection.count(query), "Count was wrong");
assertEquals(1, collection.countDocuments(query), "Count was wrong");
runner.clearTransferState();
index++;
}

View File

@ -147,7 +147,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
// verify 1 doc inserted into the collection
assertEquals(5, collection.count());
assertEquals(5, collection.countDocuments());
//assertEquals(doc, collection.find().first());
@ -168,7 +168,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 1);
assertEquals(5, collection.count());
assertEquals(5, collection.countDocuments());
}
@Test
@ -213,7 +213,7 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
// verify 1 doc inserted into the collection
assertEquals(4, collection.count());
assertEquals(4, collection.countDocuments());
//assertEquals(doc, collection.find().first());
}

View File

@ -106,7 +106,7 @@ public class PutGridFSIT extends GridFSITTestBase {
String bucketName = String.format("%s.files", BUCKET);
MongoCollection files = client.getDatabase(DB).getCollection(bucketName);
Document query = Document.parse(String.format("{\"filename\": \"%s\"}", fileName));
long count = files.count(query);
long count = files.countDocuments(query);
Assertions.assertTrue(count == 10, "Wrong count");
}

View File

@ -59,7 +59,7 @@
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-legacy</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<dependency>

View File

@ -97,7 +97,7 @@ public class MongoDBControllerService extends AbstractControllerService implemen
protected MongoClientOptions.Builder getClientOptions(final SSLContext sslContext) {
MongoClientOptions.Builder builder = MongoClientOptions.builder();
builder.sslEnabled(true);
builder.socketFactory(sslContext.getSocketFactory());
builder.sslContext(sslContext);
return builder;
}
@ -137,17 +137,28 @@ public class MongoDBControllerService extends AbstractControllerService implemen
writeConcern = WriteConcern.UNACKNOWLEDGED;
break;
case WRITE_CONCERN_FSYNCED:
writeConcern = WriteConcern.FSYNCED;
writeConcern = WriteConcern.JOURNALED;
getLogger().warn("Using deprecated write concern FSYNCED");
break;
case WRITE_CONCERN_JOURNALED:
writeConcern = WriteConcern.JOURNALED;
break;
case WRITE_CONCERN_REPLICA_ACKNOWLEDGED:
writeConcern = WriteConcern.REPLICA_ACKNOWLEDGED;
writeConcern = WriteConcern.W2;
getLogger().warn("Using deprecated write concern REPLICA_ACKNOWLEDGED");
break;
case WRITE_CONCERN_MAJORITY:
writeConcern = WriteConcern.MAJORITY;
break;
case WRITE_CONCERN_W1:
writeConcern = WriteConcern.W1;
break;
case WRITE_CONCERN_W2:
writeConcern = WriteConcern.W2;
break;
case WRITE_CONCERN_W3:
writeConcern = WriteConcern.W3;
break;
default:
writeConcern = WriteConcern.ACKNOWLEDGED;
}

View File

@ -35,7 +35,7 @@
</modules>
<properties>
<mongo.driver.version>3.12.7</mongo.driver.version>
<mongo.driver.version>4.3.2</mongo.driver.version>
</properties>
<dependencyManagement>