diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index f7f075e9a5..5ee51fd27d 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -646,6 +646,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { InetSocketAddress connectedHost = null; Exception lastConnectException = new Exception("Unknown connection error"); + if (createEnrichmentConnection) { + try { + // Ensure driverLocation and driverName are correct before establishing binlog connection + // to avoid failing after binlog messages are received. + // Actual JDBC connection is created after binlog client gets started, because we need + // the connect-able host same as the binlog client. + registerDriver(driverLocation, driverName); + } catch (InitializationException e) { + throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" + + " and MySQL Driver Class Name are configured correctly. " + e, e); + } + } + while (connectedHost == null && connectionAttempts < numHosts) { if (binlogClient == null) { @@ -1009,6 +1022,17 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { */ protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map customProperties) throws InitializationException, SQLException { + Properties connectionProps = new Properties(); + if (customProperties != null) { + connectionProps.putAll(customProperties); + } + connectionProps.put("user", username); + connectionProps.put("password", password); + + return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps); + } + + protected void registerDriver(String locationString, String drvName) throws InitializationException { if (locationString != null && locationString.length() > 0) { try { // Split and trim the entries @@ -1027,20 +1051,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { final Driver driver = (Driver) clazz.newInstance(); DriverManager.registerDriver(new DriverShim(driver)); + } catch (final InitializationException e) { + throw e; } catch (final MalformedURLException e) { throw new InitializationException("Invalid Database Driver Jar Url", e); } catch (final Exception e) { throw new InitializationException("Can't load Database Driver", e); } } - Properties connectionProps = new Properties(); - if (customProperties != null) { - connectionProps.putAll(customProperties); - } - connectionProps.put("user", username); - connectionProps.put("password", password); - - return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps); } private static class DriverShim implements Driver { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index f876b6ae93..7e8607da63 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -962,6 +962,10 @@ class CaptureChangeMySQLTest { return tableInfo } + @Override + protected void registerDriver(String locationString, String drvName) throws InitializationException { + } + @Override protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map customProperties) throws InitializationException, SQLException {