diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index ca2f185f46..cc9d86137c 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -62,6 +62,8 @@ import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter; import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter; import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter; import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory; +import org.apache.nifi.cdc.mysql.processors.ssl.ConnectionPropertiesProvider; +import org.apache.nifi.cdc.mysql.processors.ssl.StandardConnectionPropertiesProvider; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -85,6 +87,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; @@ -840,7 +843,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } if (createEnrichmentConnection) { - jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, null, connectTimeout); + final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration(); + final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration); + final Map jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties(); + jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout); try { // Ensure connection can be created. getJdbcConnection(); @@ -1218,9 +1224,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map customProperties, long connectionTimeoutMillis) { this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort(); - if (customProperties != null) { - connectionProps.putAll(customProperties); - } + connectionProps.putAll(customProperties); connectionProps.put("user", username); connectionProps.put("password", password); this.connectionTimeoutMillis = connectionTimeoutMillis; @@ -1252,7 +1256,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } } - /** * using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR. * diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/ConnectionPropertiesProvider.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/ConnectionPropertiesProvider.java new file mode 100644 index 0000000000..50dd81622b --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/ConnectionPropertiesProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.processors.ssl; + +import java.util.Map; + +/** + * JDBC Connection Properties Provider + */ +public interface ConnectionPropertiesProvider { + /** + * Get Connection Properties + * + * @return JDBC Connection Properties + */ + Map getConnectionProperties(); +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/SecurityProperty.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/SecurityProperty.java new file mode 100644 index 0000000000..82d8027c1d --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/SecurityProperty.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.processors.ssl; + +/** + * MySQL Connector/J Security Properties + */ +public enum SecurityProperty { + /** Deprecated alias for tlsVersions */ + ENABLED_TLS_PROTOCOLS("enabledTLSProtocols"), + + /** Added in MySQL 5.1.0 */ + TRUST_CERTIFICATE_KEY_STORE_URL("trustsCertificateKeyStoreUrl"), + + /** Added in MySQL 5.1.0 and defaults to JKS */ + TRUST_CERTIFICATE_KEY_STORE_TYPE("trustCertificateKeyStoreType"), + + /** Added in MySQL 5.1.0 */ + TRUST_CERTIFICATE_KEY_STORE_PASSWORD("trustCertificateKeyStorePassword"), + + /** Added in MySQL 5.1.0 */ + CLIENT_CERTIFICATE_KEY_STORE_URL("clientCertificateKeyStoreUrl"), + + /** Added in MySQL 5.1.0 and defaults to JKS */ + CLIENT_CERTIFICATE_KEY_STORE_TYPE("clientCertificateKeyStoreType"), + + /** Added in MySQL 5.1.0 */ + CLIENT_CERTIFICATE_KEY_STORE_PASSWORD("clientCertificateKeyStorePassword"), + + /** Deprecated in favor of sslMode and evaluated when useSSL is enabled */ + REQUIRE_SSL("requireSSL"), + + /** Deprecated in favor of sslMode and defaults to true in 8.0.13 and later */ + USE_SSL("useSSL"), + + /** Deprecated in favor of sslMode and defaults to false in 8.0.13 and later */ + VERIFY_SERVER_CERTIFICATE("verifyServerCertificate"); + + private final String property; + + SecurityProperty(final String property) { + this.property = property; + } + + public String getProperty() { + return property; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProvider.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProvider.java new file mode 100644 index 0000000000..e82734b6db --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProvider.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.processors.ssl; + +import com.github.shyiko.mysql.binlog.network.SSLMode; +import org.apache.nifi.security.util.TlsConfiguration; +import org.apache.nifi.security.util.TlsPlatform; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Standard implementation of Connection Properties Provider + */ +public class StandardConnectionPropertiesProvider implements ConnectionPropertiesProvider { + private static final String COMMA_SEPARATOR = ","; + + private final SSLMode sslMode; + + private final TlsConfiguration tlsConfiguration; + + public StandardConnectionPropertiesProvider( + final SSLMode sslMode, + final TlsConfiguration tlsConfiguration + ) { + this.sslMode = Objects.requireNonNull(sslMode, "SSL Mode required"); + this.tlsConfiguration = tlsConfiguration; + } + + /** + * Get Connection Properties based on SSL Mode and TLS Configuration + * + * @return JDBC Connection Properties + */ + @Override + public Map getConnectionProperties() { + final Map properties = new LinkedHashMap<>(); + + if (SSLMode.DISABLED == sslMode) { + properties.put(SecurityProperty.USE_SSL.getProperty(), Boolean.FALSE.toString()); + } else { + // Enable TLS negotiation for all modes + properties.put(SecurityProperty.USE_SSL.getProperty(), Boolean.TRUE.toString()); + + if (SSLMode.PREFERRED == sslMode) { + properties.put(SecurityProperty.REQUIRE_SSL.getProperty(), Boolean.FALSE.toString()); + } else { + // Modes other than preferred require SSL + properties.put(SecurityProperty.REQUIRE_SSL.getProperty(), Boolean.TRUE.toString()); + } + + if (SSLMode.VERIFY_IDENTITY == sslMode) { + properties.put(SecurityProperty.VERIFY_SERVER_CERTIFICATE.getProperty(), Boolean.TRUE.toString()); + } + + if (tlsConfiguration == null) { + // Set preferred protocols based on Java platform configuration + final String protocols = TlsPlatform.getPreferredProtocols().stream().collect(Collectors.joining(COMMA_SEPARATOR)); + properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols); + } else { + final Map certificateProperties = getCertificateProperties(); + properties.putAll(certificateProperties); + } + } + + return properties; + } + + private Map getCertificateProperties() { + final Map properties = new LinkedHashMap<>(); + + final String protocols = Arrays.stream(tlsConfiguration.getEnabledProtocols()).collect(Collectors.joining(COMMA_SEPARATOR)); + properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols); + + if (tlsConfiguration.isKeystorePopulated()) { + properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_URL.getProperty(), tlsConfiguration.getKeystorePath()); + properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_TYPE.getProperty(), tlsConfiguration.getKeystoreType().getType()); + properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_PASSWORD.getProperty(), tlsConfiguration.getKeystorePassword()); + } + + if (tlsConfiguration.isTruststorePopulated()) { + properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_URL.getProperty(), tlsConfiguration.getTruststorePath()); + properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_TYPE.getProperty(), tlsConfiguration.getTruststoreType().getType()); + properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_PASSWORD.getProperty(), tlsConfiguration.getTruststorePassword()); + } + + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProviderTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProviderTest.java new file mode 100644 index 0000000000..3784067ce2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/StandardConnectionPropertiesProviderTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.processors.ssl; + +import com.github.shyiko.mysql.binlog.network.SSLMode; +import org.apache.nifi.security.util.KeystoreType; +import org.apache.nifi.security.util.TlsConfiguration; +import org.apache.nifi.security.util.TlsPlatform; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class StandardConnectionPropertiesProviderTest { + private static final String KEY_STORE_PATH = "keystore.p12"; + + private static final KeystoreType KEY_STORE_TYPE = KeystoreType.PKCS12; + + private static final String KEY_STORE_PASSWORD = String.class.getName(); + + private static final String TRUST_STORE_PATH = "cacerts"; + + private static final KeystoreType TRUST_STORE_TYPE = KeystoreType.PKCS12; + + private static final String TRUST_STORE_PASSWORD = Integer.class.getName(); + + @Mock + TlsConfiguration tlsConfiguration; + + @Test + void testGetConnectionPropertiesSslModeDisabled() { + final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.DISABLED, null); + + final Map properties = provider.getConnectionProperties(); + + assertNotNull(properties); + + final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty()); + assertEquals(Boolean.FALSE.toString(), useSsl); + } + + @Test + void testGetConnectionPropertiesSslModePreferred() { + final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.PREFERRED, null); + + final Map properties = provider.getConnectionProperties(); + + assertNotNull(properties); + + final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), useSsl); + + final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty()); + assertEquals(Boolean.FALSE.toString(), requireSsl); + + final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty()); + assertNotNull(protocols); + } + + @Test + void testGetConnectionPropertiesSslModeRequired() { + final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.REQUIRED, null); + + final Map properties = provider.getConnectionProperties(); + + assertNotNull(properties); + + final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), useSsl); + + final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), requireSsl); + + final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty()); + assertNotNull(protocols); + } + + @Test + void testGetConnectionPropertiesSslModeVerifyIdentity() { + final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.VERIFY_IDENTITY, null); + + final Map properties = provider.getConnectionProperties(); + + assertNotNull(properties); + + final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), useSsl); + + final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), requireSsl); + + final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty()); + assertNotNull(protocols); + + final String verifyServerCertificate = properties.get(SecurityProperty.VERIFY_SERVER_CERTIFICATE.getProperty()); + assertEquals(Boolean.TRUE.toString(), verifyServerCertificate); + } + + @Test + void testGetConnectionPropertiesSslModeRequiredTlsConfiguration() { + final String latestProtocol = TlsPlatform.getLatestProtocol(); + when(tlsConfiguration.getEnabledProtocols()).thenReturn(new String[]{latestProtocol}); + when(tlsConfiguration.isKeystorePopulated()).thenReturn(true); + when(tlsConfiguration.getKeystorePath()).thenReturn(KEY_STORE_PATH); + when(tlsConfiguration.getKeystoreType()).thenReturn(KEY_STORE_TYPE); + when(tlsConfiguration.getKeystorePassword()).thenReturn(KEY_STORE_PASSWORD); + when(tlsConfiguration.isTruststorePopulated()).thenReturn(true); + when(tlsConfiguration.getTruststorePath()).thenReturn(TRUST_STORE_PATH); + when(tlsConfiguration.getTruststoreType()).thenReturn(TRUST_STORE_TYPE); + when(tlsConfiguration.getTruststorePassword()).thenReturn(TRUST_STORE_PASSWORD); + + final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.REQUIRED, tlsConfiguration); + + final Map properties = provider.getConnectionProperties(); + + assertNotNull(properties); + + final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), useSsl); + + final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty()); + assertEquals(Boolean.TRUE.toString(), requireSsl); + + final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty()); + assertEquals(latestProtocol, protocols); + + final String clientCertificateUrl = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_URL.getProperty()); + assertEquals(KEY_STORE_PATH, clientCertificateUrl); + + final String clientCertificateType = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_TYPE.getProperty()); + assertEquals(KEY_STORE_TYPE.getType(), clientCertificateType); + + final String clientCertificatePassword = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_PASSWORD.getProperty()); + assertEquals(KEY_STORE_PASSWORD, clientCertificatePassword); + + final String trustCertificateUrl = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_URL.getProperty()); + assertEquals(TRUST_STORE_PATH, trustCertificateUrl); + + final String trustCertificateType = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_TYPE.getProperty()); + assertEquals(TRUST_STORE_TYPE.getType(), trustCertificateType); + + final String trustCertificatePassword = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_PASSWORD.getProperty()); + assertEquals(TRUST_STORE_PASSWORD, trustCertificatePassword); + } +}