mirror of https://github.com/apache/nifi.git
NIFI-4490: Ensure driver settings are correct before connecting binlog
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2244.
This commit is contained in:
parent
9a850c7ed2
commit
6413337918
|
@ -646,6 +646,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
InetSocketAddress connectedHost = null;
|
InetSocketAddress connectedHost = null;
|
||||||
Exception lastConnectException = new Exception("Unknown connection error");
|
Exception lastConnectException = new Exception("Unknown connection error");
|
||||||
|
|
||||||
|
if (createEnrichmentConnection) {
|
||||||
|
try {
|
||||||
|
// Ensure driverLocation and driverName are correct before establishing binlog connection
|
||||||
|
// to avoid failing after binlog messages are received.
|
||||||
|
// Actual JDBC connection is created after binlog client gets started, because we need
|
||||||
|
// the connect-able host same as the binlog client.
|
||||||
|
registerDriver(driverLocation, driverName);
|
||||||
|
} catch (InitializationException e) {
|
||||||
|
throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" +
|
||||||
|
" and MySQL Driver Class Name are configured correctly. " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (connectedHost == null && connectionAttempts < numHosts) {
|
while (connectedHost == null && connectionAttempts < numHosts) {
|
||||||
if (binlogClient == null) {
|
if (binlogClient == null) {
|
||||||
|
|
||||||
|
@ -1009,6 +1022,17 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
*/
|
*/
|
||||||
protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
|
protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
|
||||||
throws InitializationException, SQLException {
|
throws InitializationException, SQLException {
|
||||||
|
Properties connectionProps = new Properties();
|
||||||
|
if (customProperties != null) {
|
||||||
|
connectionProps.putAll(customProperties);
|
||||||
|
}
|
||||||
|
connectionProps.put("user", username);
|
||||||
|
connectionProps.put("password", password);
|
||||||
|
|
||||||
|
return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void registerDriver(String locationString, String drvName) throws InitializationException {
|
||||||
if (locationString != null && locationString.length() > 0) {
|
if (locationString != null && locationString.length() > 0) {
|
||||||
try {
|
try {
|
||||||
// Split and trim the entries
|
// Split and trim the entries
|
||||||
|
@ -1027,20 +1051,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
final Driver driver = (Driver) clazz.newInstance();
|
final Driver driver = (Driver) clazz.newInstance();
|
||||||
DriverManager.registerDriver(new DriverShim(driver));
|
DriverManager.registerDriver(new DriverShim(driver));
|
||||||
|
|
||||||
|
} catch (final InitializationException e) {
|
||||||
|
throw e;
|
||||||
} catch (final MalformedURLException e) {
|
} catch (final MalformedURLException e) {
|
||||||
throw new InitializationException("Invalid Database Driver Jar Url", e);
|
throw new InitializationException("Invalid Database Driver Jar Url", e);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new InitializationException("Can't load Database Driver", e);
|
throw new InitializationException("Can't load Database Driver", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Properties connectionProps = new Properties();
|
|
||||||
if (customProperties != null) {
|
|
||||||
connectionProps.putAll(customProperties);
|
|
||||||
}
|
|
||||||
connectionProps.put("user", username);
|
|
||||||
connectionProps.put("password", password);
|
|
||||||
|
|
||||||
return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class DriverShim implements Driver {
|
private static class DriverShim implements Driver {
|
||||||
|
|
|
@ -962,6 +962,10 @@ class CaptureChangeMySQLTest {
|
||||||
return tableInfo
|
return tableInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void registerDriver(String locationString, String drvName) throws InitializationException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
|
protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
|
||||||
throws InitializationException, SQLException {
|
throws InitializationException, SQLException {
|
||||||
|
|
Loading…
Reference in New Issue