NIFI-10781 Made the MongoDB controller service implement VerifiableControllerService.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6635
This commit is contained in:
Mike Thomsen 2022-11-08 15:57:39 -05:00 committed by Matthew Burgess
parent 01816da56f
commit ddfaf16f68
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 98 additions and 24 deletions

View File

@ -23,13 +23,14 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
public interface MongoDBClientService extends ControllerService {
public interface MongoDBClientService extends ControllerService, VerifiableControllerService {
String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
String WRITE_CONCERN_FSYNCED = "FSYNCED";
@ -40,33 +41,33 @@ public interface MongoDBClientService extends ControllerService {
String WRITE_CONCERN_W2 = "W2";
String WRITE_CONCERN_W3 = "W3";
static final AllowableValue WRITE_CONCERN_ACKNOWLEDGED_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_ACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_ACKNOWLEDGED,
"Write operations that use this write concern will wait for acknowledgement, " +
"using the default write concern configured on the server");
static final AllowableValue WRITE_CONCERN_UNACKNOWLEDGED_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_UNACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED,
"Write operations that use this write concern will return as soon as the message is written to the socket. " +
"Exceptions are raised for network issues, but not server errors");
static final AllowableValue WRITE_CONCERN_FSYNCED_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_FSYNCED_VALUE = new AllowableValue(
WRITE_CONCERN_FSYNCED, WRITE_CONCERN_FSYNCED,
"Deprecated. Use of \"" + WRITE_CONCERN_JOURNALED + "\" is preferred");
static final AllowableValue WRITE_CONCERN_JOURNALED_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_JOURNALED_VALUE = new AllowableValue(
WRITE_CONCERN_JOURNALED, WRITE_CONCERN_JOURNALED,
"Write operations wait for the server to group commit to the journal file on disk");
static final AllowableValue WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_REPLICA_ACKNOWLEDGED, WRITE_CONCERN_REPLICA_ACKNOWLEDGED,
"Deprecated. Use of \"" + WRITE_CONCERN_W2 + "\" is preferred");
static final AllowableValue WRITE_CONCERN_MAJORITY_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_MAJORITY_VALUE = new AllowableValue(
WRITE_CONCERN_MAJORITY, WRITE_CONCERN_MAJORITY,
"Exceptions are raised for network issues, and server errors; waits on a majority of servers for the write operation");
static final AllowableValue WRITE_CONCERN_W1_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_W1_VALUE = new AllowableValue(
WRITE_CONCERN_W1, WRITE_CONCERN_W1,
"Write operations that use this write concern will wait for acknowledgement from a single member");
static final AllowableValue WRITE_CONCERN_W2_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_W2_VALUE = new AllowableValue(
WRITE_CONCERN_W2, WRITE_CONCERN_W2,
"Write operations that use this write concern will wait for acknowledgement from two members");
static final AllowableValue WRITE_CONCERN_W3_VALUE = new AllowableValue(
AllowableValue WRITE_CONCERN_W3_VALUE = new AllowableValue(
WRITE_CONCERN_W3, WRITE_CONCERN_W3,
"Write operations that use this write concern will wait for acknowledgement from three members");

View File

@ -23,20 +23,25 @@ import com.mongodb.MongoClientURI;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
@ -49,7 +54,7 @@ public class MongoDBControllerService extends AbstractControllerService implemen
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.uri = getURI(context);
this.createClient(context);
this.mongoClient = createClient(context, this.mongoClient);
}
static List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -65,9 +70,9 @@ public class MongoDBControllerService extends AbstractControllerService implemen
protected MongoClient mongoClient;
// TODO: Remove duplicate code by refactoring shared method to accept PropertyContext
protected final void createClient(ConfigurationContext context) {
if (mongoClient != null) {
closeClient();
protected MongoClient createClient(ConfigurationContext context, MongoClient existing) {
if (existing != null) {
closeClient(existing);
}
getLogger().info("Creating MongoClient");
@ -84,9 +89,9 @@ public class MongoDBControllerService extends AbstractControllerService implemen
try {
if(sslContext == null) {
mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
return new MongoClient(new MongoClientURI(getURI(context)));
} else {
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
return new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
}
} catch (Exception e) {
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
@ -102,10 +107,13 @@ public class MongoDBControllerService extends AbstractControllerService implemen
}
@OnStopped
public final void closeClient() {
if (mongoClient != null) {
mongoClient.close();
mongoClient = null;
public final void onStopped() {
closeClient(mongoClient);
}
private void closeClient(MongoClient client) {
if (client != null) {
client.close();
}
}
@ -185,4 +193,28 @@ public class MongoDBControllerService extends AbstractControllerService implemen
public String getURI() {
return uri;
}
@Override
public List<ConfigVerificationResult> verify(ConfigurationContext context,
ComponentLog verificationLogger,
Map<String, String> variables) {
ConfigVerificationResult.Builder connectionSuccessful = new ConfigVerificationResult.Builder()
.verificationStepName("Connection test");
MongoClient client = null;
try {
client = createClient(context, null);
MongoDatabase db = client.getDatabase("test");
db.runCommand(new Document("buildInfo", 1));
connectionSuccessful.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
} catch (Exception ex) {
connectionSuccessful
.explanation(ex.getMessage())
.outcome(ConfigVerificationResult.Outcome.FAILED);
} finally {
closeClient(client);
}
return Arrays.asList(connectionSuccessful.build());
}
}

View File

@ -17,17 +17,27 @@
package org.apache.nifi.mongodb;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceLookup;
import org.apache.nifi.util.MockVariableRegistry;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MongoDBControllerServiceIT {
private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
private static final String IDENTIFIER = "Client Service";
private TestRunner runner;
private MongoDBControllerService service;
@ -36,7 +46,7 @@ public class MongoDBControllerServiceIT {
public void before() throws Exception {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new MongoDBControllerService();
runner.addControllerService("Client Service", service);
runner.addControllerService(IDENTIFIER, service);
runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:27017");
runner.enableControllerService(service);
}
@ -50,4 +60,35 @@ public class MongoDBControllerServiceIT {
public void testInit() throws Exception {
runner.assertValid(service);
}
private Map<PropertyDescriptor, String> getClientServiceProperties() {
return ((MockControllerServiceLookup) runner.getProcessContext().getControllerServiceLookup())
.getControllerServices().get(IDENTIFIER).getProperties();
}
@Test
public void testVerifyWithCorrectConnectionString() {
final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(1, results.size());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
}
@Test
public void testVerifyWithIncorrectConnectionString() {
runner.disableControllerService(service);
runner.setProperty(service, MongoDBControllerService.URI, "mongodb://localhost:2701");
final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(1, results.size());
assertEquals(ConfigVerificationResult.Outcome.FAILED, results.get(0).getOutcome());
}
}