mirror of https://github.com/apache/nifi.git
NIFI-10358 Updated CaptureChangeMySQL to use SSL JDBC properties
- Added Connection Properties Provider with implementation to translate SSL Mode and SSL Context Service configuration to MySQL Connector properties Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #6306
This commit is contained in:
parent
21503f6353
commit
5df6efa0f2
|
@ -62,6 +62,8 @@ import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
|
|||
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
|
||||
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
|
||||
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
|
||||
import org.apache.nifi.cdc.mysql.processors.ssl.ConnectionPropertiesProvider;
|
||||
import org.apache.nifi.cdc.mysql.processors.ssl.StandardConnectionPropertiesProvider;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
|
@ -85,6 +87,7 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
|
||||
|
||||
|
@ -840,7 +843,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
|
||||
if (createEnrichmentConnection) {
|
||||
jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, null, connectTimeout);
|
||||
final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration();
|
||||
final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
|
||||
final Map<String, String> jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties();
|
||||
jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout);
|
||||
try {
|
||||
// Ensure connection can be created.
|
||||
getJdbcConnection();
|
||||
|
@ -1218,9 +1224,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
|
||||
private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) {
|
||||
this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort();
|
||||
if (customProperties != null) {
|
||||
connectionProps.putAll(customProperties);
|
||||
}
|
||||
connectionProps.putAll(customProperties);
|
||||
connectionProps.put("user", username);
|
||||
connectionProps.put("password", password);
|
||||
this.connectionTimeoutMillis = connectionTimeoutMillis;
|
||||
|
@ -1252,7 +1256,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cdc.mysql.processors.ssl;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* JDBC Connection Properties Provider
|
||||
*/
|
||||
public interface ConnectionPropertiesProvider {
|
||||
/**
|
||||
* Get Connection Properties
|
||||
*
|
||||
* @return JDBC Connection Properties
|
||||
*/
|
||||
Map<String, String> getConnectionProperties();
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cdc.mysql.processors.ssl;
|
||||
|
||||
/**
|
||||
* MySQL Connector/J Security Properties
|
||||
*/
|
||||
public enum SecurityProperty {
|
||||
/** Deprecated alias for tlsVersions */
|
||||
ENABLED_TLS_PROTOCOLS("enabledTLSProtocols"),
|
||||
|
||||
/** Added in MySQL 5.1.0 */
|
||||
TRUST_CERTIFICATE_KEY_STORE_URL("trustsCertificateKeyStoreUrl"),
|
||||
|
||||
/** Added in MySQL 5.1.0 and defaults to JKS */
|
||||
TRUST_CERTIFICATE_KEY_STORE_TYPE("trustCertificateKeyStoreType"),
|
||||
|
||||
/** Added in MySQL 5.1.0 */
|
||||
TRUST_CERTIFICATE_KEY_STORE_PASSWORD("trustCertificateKeyStorePassword"),
|
||||
|
||||
/** Added in MySQL 5.1.0 */
|
||||
CLIENT_CERTIFICATE_KEY_STORE_URL("clientCertificateKeyStoreUrl"),
|
||||
|
||||
/** Added in MySQL 5.1.0 and defaults to JKS */
|
||||
CLIENT_CERTIFICATE_KEY_STORE_TYPE("clientCertificateKeyStoreType"),
|
||||
|
||||
/** Added in MySQL 5.1.0 */
|
||||
CLIENT_CERTIFICATE_KEY_STORE_PASSWORD("clientCertificateKeyStorePassword"),
|
||||
|
||||
/** Deprecated in favor of sslMode and evaluated when useSSL is enabled */
|
||||
REQUIRE_SSL("requireSSL"),
|
||||
|
||||
/** Deprecated in favor of sslMode and defaults to true in 8.0.13 and later */
|
||||
USE_SSL("useSSL"),
|
||||
|
||||
/** Deprecated in favor of sslMode and defaults to false in 8.0.13 and later */
|
||||
VERIFY_SERVER_CERTIFICATE("verifyServerCertificate");
|
||||
|
||||
private final String property;
|
||||
|
||||
SecurityProperty(final String property) {
|
||||
this.property = property;
|
||||
}
|
||||
|
||||
public String getProperty() {
|
||||
return property;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cdc.mysql.processors.ssl;
|
||||
|
||||
import com.github.shyiko.mysql.binlog.network.SSLMode;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.security.util.TlsPlatform;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Standard implementation of Connection Properties Provider
|
||||
*/
|
||||
public class StandardConnectionPropertiesProvider implements ConnectionPropertiesProvider {
|
||||
private static final String COMMA_SEPARATOR = ",";
|
||||
|
||||
private final SSLMode sslMode;
|
||||
|
||||
private final TlsConfiguration tlsConfiguration;
|
||||
|
||||
public StandardConnectionPropertiesProvider(
|
||||
final SSLMode sslMode,
|
||||
final TlsConfiguration tlsConfiguration
|
||||
) {
|
||||
this.sslMode = Objects.requireNonNull(sslMode, "SSL Mode required");
|
||||
this.tlsConfiguration = tlsConfiguration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Connection Properties based on SSL Mode and TLS Configuration
|
||||
*
|
||||
* @return JDBC Connection Properties
|
||||
*/
|
||||
@Override
|
||||
public Map<String, String> getConnectionProperties() {
|
||||
final Map<String, String> properties = new LinkedHashMap<>();
|
||||
|
||||
if (SSLMode.DISABLED == sslMode) {
|
||||
properties.put(SecurityProperty.USE_SSL.getProperty(), Boolean.FALSE.toString());
|
||||
} else {
|
||||
// Enable TLS negotiation for all modes
|
||||
properties.put(SecurityProperty.USE_SSL.getProperty(), Boolean.TRUE.toString());
|
||||
|
||||
if (SSLMode.PREFERRED == sslMode) {
|
||||
properties.put(SecurityProperty.REQUIRE_SSL.getProperty(), Boolean.FALSE.toString());
|
||||
} else {
|
||||
// Modes other than preferred require SSL
|
||||
properties.put(SecurityProperty.REQUIRE_SSL.getProperty(), Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
if (SSLMode.VERIFY_IDENTITY == sslMode) {
|
||||
properties.put(SecurityProperty.VERIFY_SERVER_CERTIFICATE.getProperty(), Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
if (tlsConfiguration == null) {
|
||||
// Set preferred protocols based on Java platform configuration
|
||||
final String protocols = TlsPlatform.getPreferredProtocols().stream().collect(Collectors.joining(COMMA_SEPARATOR));
|
||||
properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols);
|
||||
} else {
|
||||
final Map<String, String> certificateProperties = getCertificateProperties();
|
||||
properties.putAll(certificateProperties);
|
||||
}
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
private Map<String, String> getCertificateProperties() {
|
||||
final Map<String, String> properties = new LinkedHashMap<>();
|
||||
|
||||
final String protocols = Arrays.stream(tlsConfiguration.getEnabledProtocols()).collect(Collectors.joining(COMMA_SEPARATOR));
|
||||
properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols);
|
||||
|
||||
if (tlsConfiguration.isKeystorePopulated()) {
|
||||
properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_URL.getProperty(), tlsConfiguration.getKeystorePath());
|
||||
properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_TYPE.getProperty(), tlsConfiguration.getKeystoreType().getType());
|
||||
properties.put(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_PASSWORD.getProperty(), tlsConfiguration.getKeystorePassword());
|
||||
}
|
||||
|
||||
if (tlsConfiguration.isTruststorePopulated()) {
|
||||
properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_URL.getProperty(), tlsConfiguration.getTruststorePath());
|
||||
properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_TYPE.getProperty(), tlsConfiguration.getTruststoreType().getType());
|
||||
properties.put(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_PASSWORD.getProperty(), tlsConfiguration.getTruststorePassword());
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.cdc.mysql.processors.ssl;
|
||||
|
||||
import com.github.shyiko.mysql.binlog.network.SSLMode;
|
||||
import org.apache.nifi.security.util.KeystoreType;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.security.util.TlsPlatform;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class StandardConnectionPropertiesProviderTest {
|
||||
private static final String KEY_STORE_PATH = "keystore.p12";
|
||||
|
||||
private static final KeystoreType KEY_STORE_TYPE = KeystoreType.PKCS12;
|
||||
|
||||
private static final String KEY_STORE_PASSWORD = String.class.getName();
|
||||
|
||||
private static final String TRUST_STORE_PATH = "cacerts";
|
||||
|
||||
private static final KeystoreType TRUST_STORE_TYPE = KeystoreType.PKCS12;
|
||||
|
||||
private static final String TRUST_STORE_PASSWORD = Integer.class.getName();
|
||||
|
||||
@Mock
|
||||
TlsConfiguration tlsConfiguration;
|
||||
|
||||
@Test
|
||||
void testGetConnectionPropertiesSslModeDisabled() {
|
||||
final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.DISABLED, null);
|
||||
|
||||
final Map<String, String> properties = provider.getConnectionProperties();
|
||||
|
||||
assertNotNull(properties);
|
||||
|
||||
final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty());
|
||||
assertEquals(Boolean.FALSE.toString(), useSsl);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetConnectionPropertiesSslModePreferred() {
|
||||
final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.PREFERRED, null);
|
||||
|
||||
final Map<String, String> properties = provider.getConnectionProperties();
|
||||
|
||||
assertNotNull(properties);
|
||||
|
||||
final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), useSsl);
|
||||
|
||||
final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty());
|
||||
assertEquals(Boolean.FALSE.toString(), requireSsl);
|
||||
|
||||
final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty());
|
||||
assertNotNull(protocols);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetConnectionPropertiesSslModeRequired() {
|
||||
final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.REQUIRED, null);
|
||||
|
||||
final Map<String, String> properties = provider.getConnectionProperties();
|
||||
|
||||
assertNotNull(properties);
|
||||
|
||||
final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), useSsl);
|
||||
|
||||
final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), requireSsl);
|
||||
|
||||
final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty());
|
||||
assertNotNull(protocols);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetConnectionPropertiesSslModeVerifyIdentity() {
|
||||
final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.VERIFY_IDENTITY, null);
|
||||
|
||||
final Map<String, String> properties = provider.getConnectionProperties();
|
||||
|
||||
assertNotNull(properties);
|
||||
|
||||
final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), useSsl);
|
||||
|
||||
final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), requireSsl);
|
||||
|
||||
final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty());
|
||||
assertNotNull(protocols);
|
||||
|
||||
final String verifyServerCertificate = properties.get(SecurityProperty.VERIFY_SERVER_CERTIFICATE.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), verifyServerCertificate);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetConnectionPropertiesSslModeRequiredTlsConfiguration() {
|
||||
final String latestProtocol = TlsPlatform.getLatestProtocol();
|
||||
when(tlsConfiguration.getEnabledProtocols()).thenReturn(new String[]{latestProtocol});
|
||||
when(tlsConfiguration.isKeystorePopulated()).thenReturn(true);
|
||||
when(tlsConfiguration.getKeystorePath()).thenReturn(KEY_STORE_PATH);
|
||||
when(tlsConfiguration.getKeystoreType()).thenReturn(KEY_STORE_TYPE);
|
||||
when(tlsConfiguration.getKeystorePassword()).thenReturn(KEY_STORE_PASSWORD);
|
||||
when(tlsConfiguration.isTruststorePopulated()).thenReturn(true);
|
||||
when(tlsConfiguration.getTruststorePath()).thenReturn(TRUST_STORE_PATH);
|
||||
when(tlsConfiguration.getTruststoreType()).thenReturn(TRUST_STORE_TYPE);
|
||||
when(tlsConfiguration.getTruststorePassword()).thenReturn(TRUST_STORE_PASSWORD);
|
||||
|
||||
final StandardConnectionPropertiesProvider provider = new StandardConnectionPropertiesProvider(SSLMode.REQUIRED, tlsConfiguration);
|
||||
|
||||
final Map<String, String> properties = provider.getConnectionProperties();
|
||||
|
||||
assertNotNull(properties);
|
||||
|
||||
final String useSsl = properties.get(SecurityProperty.USE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), useSsl);
|
||||
|
||||
final String requireSsl = properties.get(SecurityProperty.REQUIRE_SSL.getProperty());
|
||||
assertEquals(Boolean.TRUE.toString(), requireSsl);
|
||||
|
||||
final String protocols = properties.get(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty());
|
||||
assertEquals(latestProtocol, protocols);
|
||||
|
||||
final String clientCertificateUrl = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_URL.getProperty());
|
||||
assertEquals(KEY_STORE_PATH, clientCertificateUrl);
|
||||
|
||||
final String clientCertificateType = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_TYPE.getProperty());
|
||||
assertEquals(KEY_STORE_TYPE.getType(), clientCertificateType);
|
||||
|
||||
final String clientCertificatePassword = properties.get(SecurityProperty.CLIENT_CERTIFICATE_KEY_STORE_PASSWORD.getProperty());
|
||||
assertEquals(KEY_STORE_PASSWORD, clientCertificatePassword);
|
||||
|
||||
final String trustCertificateUrl = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_URL.getProperty());
|
||||
assertEquals(TRUST_STORE_PATH, trustCertificateUrl);
|
||||
|
||||
final String trustCertificateType = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_TYPE.getProperty());
|
||||
assertEquals(TRUST_STORE_TYPE.getType(), trustCertificateType);
|
||||
|
||||
final String trustCertificatePassword = properties.get(SecurityProperty.TRUST_CERTIFICATE_KEY_STORE_PASSWORD.getProperty());
|
||||
assertEquals(TRUST_STORE_PASSWORD, trustCertificatePassword);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue