diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 96be0c9693..c328835fcd 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -640,6 +640,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { int connectionAttempts = 0; final int numHosts = hosts.size(); InetSocketAddress connectedHost = null; + Exception lastConnectException = new Exception("Unknown connection error"); while (connectedHost == null && connectionAttempts < numHosts) { if (binlogClient == null) { @@ -682,11 +683,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { transitUri = ""; currentHost = (currentHost + 1) % numHosts; connectionAttempts++; + lastConnectException = te; } } if (!binlogClient.isConnected()) { binlogClient = null; - throw new IOException("Could not connect binlog client to any of the specified hosts"); + throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException); } if (createEnrichmentConnection) { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 8106e40d19..a8edb3ac91 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -44,6 +44,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition import org.apache.nifi.cdc.event.TableInfo import org.apache.nifi.cdc.event.TableInfoCacheKey import org.apache.nifi.cdc.event.io.EventWriter +import org.apache.nifi.processor.exception.ProcessException import org.apache.nifi.provenance.ProvenanceEventType import org.apache.nifi.reporting.InitializationException import org.apache.nifi.state.MockStateManager @@ -59,6 +60,7 @@ import java.sql.Connection import java.sql.ResultSet import java.sql.SQLException import java.sql.Statement +import java.util.concurrent.TimeoutException import java.util.regex.Matcher import java.util.regex.Pattern @@ -88,6 +90,38 @@ class CaptureChangeMySQLTest { } + @Test + void testConnectionFailures() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + client.connectionError = true + try { + testRunner.run() + } catch (AssertionError ae) { + def pe = ae.getCause() + assertTrue(pe instanceof ProcessException) + def ioe = pe.getCause() + assertTrue(ioe instanceof IOException) + assertEquals('Could not connect binlog client to any of the specified hosts due to: Error during connect', ioe.getMessage()) + assertTrue(ioe.getCause() instanceof IOException) + } + client.connectionError = false + + client.connectionTimeout = true + try { + testRunner.run() + } catch (AssertionError ae) { + def pe = ae.getCause() + assertTrue(pe instanceof ProcessException) + def ioe = pe.getCause() + assertTrue(ioe instanceof IOException) + assertEquals('Could not connect binlog client to any of the specified hosts due to: Connection timed out', ioe.getMessage()) + assertTrue(ioe.getCause() instanceof TimeoutException) + } + client.connectionTimeout = false + } + @Test void testBeginCommitTransaction() throws Exception { testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')