NIFI-5501: Fixed classloader issue leading to multiple abandoned threads (#7031)

This commit is contained in:
Matt Burgess 2023-03-10 15:37:32 -05:00 committed by GitHub
parent ed1cee4520
commit c1890a5bb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 15 deletions

View File

@ -29,6 +29,7 @@ import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -92,13 +93,11 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
@ -161,6 +160,7 @@ import static org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy.MAX_EVENTS
@WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
+ "application/json")
})
@RequiresInstanceClassLoading
public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Random invalid constant used as an indicator to not set the binlog position on the client (thereby using the latest available)
@ -257,6 +257,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
@ -1403,16 +1404,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
protected void registerDriver(String locationString, String drvName) throws InitializationException {
if (locationString != null && locationString.length() > 0) {
try {
// Split and trim the entries
final ClassLoader classLoader = ClassLoaderUtils.getCustomClassLoader(
locationString,
this.getClass().getClassLoader(),
(dir, name) -> name != null && name.endsWith(".jar")
);
// Workaround which allows to use URLClassLoader for JDBC driver loading.
// (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.)
final Class<?> clazz = Class.forName(drvName, true, classLoader);
final Class<?> clazz = Class.forName(drvName);
if (clazz == null) {
throw new InitializationException("Can't load Database Driver " + drvName);
}
@ -1421,8 +1413,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
} catch (final InitializationException e) {
throw e;
} catch (final MalformedURLException e) {
throw new InitializationException("Invalid Database Driver Jar Url", e);
} catch (final Exception e) {
throw new InitializationException("Can't load Database Driver", e);
}

View File

@ -31,6 +31,7 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
import com.github.shyiko.mysql.binlog.network.SSLMode
import groovy.json.JsonSlurper
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading
import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
import org.apache.nifi.cdc.event.TableInfoCacheKey
@ -1459,7 +1460,7 @@ class CaptureChangeMySQLTest {
/********************************
* Mock and helper classes below
********************************/
@RequiresInstanceClassLoading
class MockCaptureChangeMySQL extends CaptureChangeMySQL {
Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>()