mirror of https://github.com/apache/nifi.git
NIFI-3741: Improved connection error message for CaptureChangeMySQL
This closes #1704. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
097548da9d
commit
cf0e8bb1d3
|
@ -640,6 +640,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
int connectionAttempts = 0;
|
int connectionAttempts = 0;
|
||||||
final int numHosts = hosts.size();
|
final int numHosts = hosts.size();
|
||||||
InetSocketAddress connectedHost = null;
|
InetSocketAddress connectedHost = null;
|
||||||
|
Exception lastConnectException = new Exception("Unknown connection error");
|
||||||
|
|
||||||
while (connectedHost == null && connectionAttempts < numHosts) {
|
while (connectedHost == null && connectionAttempts < numHosts) {
|
||||||
if (binlogClient == null) {
|
if (binlogClient == null) {
|
||||||
|
@ -682,11 +683,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
transitUri = "<unknown>";
|
transitUri = "<unknown>";
|
||||||
currentHost = (currentHost + 1) % numHosts;
|
currentHost = (currentHost + 1) % numHosts;
|
||||||
connectionAttempts++;
|
connectionAttempts++;
|
||||||
|
lastConnectException = te;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!binlogClient.isConnected()) {
|
if (!binlogClient.isConnected()) {
|
||||||
binlogClient = null;
|
binlogClient = null;
|
||||||
throw new IOException("Could not connect binlog client to any of the specified hosts");
|
throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (createEnrichmentConnection) {
|
if (createEnrichmentConnection) {
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition
|
||||||
import org.apache.nifi.cdc.event.TableInfo
|
import org.apache.nifi.cdc.event.TableInfo
|
||||||
import org.apache.nifi.cdc.event.TableInfoCacheKey
|
import org.apache.nifi.cdc.event.TableInfoCacheKey
|
||||||
import org.apache.nifi.cdc.event.io.EventWriter
|
import org.apache.nifi.cdc.event.io.EventWriter
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException
|
||||||
import org.apache.nifi.provenance.ProvenanceEventType
|
import org.apache.nifi.provenance.ProvenanceEventType
|
||||||
import org.apache.nifi.reporting.InitializationException
|
import org.apache.nifi.reporting.InitializationException
|
||||||
import org.apache.nifi.state.MockStateManager
|
import org.apache.nifi.state.MockStateManager
|
||||||
|
@ -59,6 +60,7 @@ import java.sql.Connection
|
||||||
import java.sql.ResultSet
|
import java.sql.ResultSet
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
import java.sql.Statement
|
import java.sql.Statement
|
||||||
|
import java.util.concurrent.TimeoutException
|
||||||
import java.util.regex.Matcher
|
import java.util.regex.Matcher
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
|
|
||||||
|
@ -88,6 +90,38 @@ class CaptureChangeMySQLTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConnectionFailures() throws Exception {
|
||||||
|
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
|
||||||
|
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
|
||||||
|
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
|
||||||
|
client.connectionError = true
|
||||||
|
try {
|
||||||
|
testRunner.run()
|
||||||
|
} catch (AssertionError ae) {
|
||||||
|
def pe = ae.getCause()
|
||||||
|
assertTrue(pe instanceof ProcessException)
|
||||||
|
def ioe = pe.getCause()
|
||||||
|
assertTrue(ioe instanceof IOException)
|
||||||
|
assertEquals('Could not connect binlog client to any of the specified hosts due to: Error during connect', ioe.getMessage())
|
||||||
|
assertTrue(ioe.getCause() instanceof IOException)
|
||||||
|
}
|
||||||
|
client.connectionError = false
|
||||||
|
|
||||||
|
client.connectionTimeout = true
|
||||||
|
try {
|
||||||
|
testRunner.run()
|
||||||
|
} catch (AssertionError ae) {
|
||||||
|
def pe = ae.getCause()
|
||||||
|
assertTrue(pe instanceof ProcessException)
|
||||||
|
def ioe = pe.getCause()
|
||||||
|
assertTrue(ioe instanceof IOException)
|
||||||
|
assertEquals('Could not connect binlog client to any of the specified hosts due to: Connection timed out', ioe.getMessage())
|
||||||
|
assertTrue(ioe.getCause() instanceof TimeoutException)
|
||||||
|
}
|
||||||
|
client.connectionTimeout = false
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testBeginCommitTransaction() throws Exception {
|
void testBeginCommitTransaction() throws Exception {
|
||||||
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
|
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
|
||||||
|
|
Loading…
Reference in New Issue