support using mariadb connector with mysql extensions (#11402)

* support using mariadb connector with mysql extensions

* cleanup and more tests

* fix test

* javadocs, more tests, etc

* style and more test

* more test more better

* missing pom

* more pom
This commit is contained in:
Clint Wylie 2021-07-08 12:25:37 -07:00 committed by GitHub
parent 3481bb0440
commit 63fcd77c38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1099 additions and 278 deletions

View File

@ -610,6 +610,11 @@ jobs:
jdk: openjdk8 jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
- <<: *integration_query
name: "(Compile=openjdk8, Run=openjdk8) query integration test (mariaDB)"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query' USE_INDEXER='middleManager' MYSQL_DRIVER_CLASSNAME='org.mariadb.jdbc.Driver'
# END - Integration tests for Compile with Java 8 and Run with Java 8 # END - Integration tests for Compile with Java 8 and Run with Java 8
# START - Integration tests for Compile with Java 8 and Run with Java 11 # START - Integration tests for Compile with Java 8 and Run with Java 11
@ -683,6 +688,11 @@ jobs:
jdk: openjdk8 jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' env: TESTNG_GROUPS='-Dgroups=high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager'
- <<: *integration_query
name: "(Compile=openjdk8, Run=openjdk11) query integration test (mariaDB)"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' MYSQL_DRIVER_CLASSNAME='org.mariadb.jdbc.Driver'
# END - Integration tests for Compile with Java 8 and Run with Java 11 # END - Integration tests for Compile with Java 8 and Run with Java 11
- &integration_batch_index_k8s - &integration_batch_index_k8s

View File

@ -349,11 +349,35 @@
<version>${mockito.version}</version> <version>${mockito.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -49,16 +49,19 @@ public class MetadataStorageConnectorConfig
@JsonProperty("dbcp") @JsonProperty("dbcp")
private Properties dbcpProperties; private Properties dbcpProperties;
@JsonProperty
public boolean isCreateTables() public boolean isCreateTables()
{ {
return createTables; return createTables;
} }
@JsonProperty
public String getHost() public String getHost()
{ {
return host; return host;
} }
@JsonProperty
public int getPort() public int getPort()
{ {
return port; return port;
@ -73,16 +76,19 @@ public class MetadataStorageConnectorConfig
return connectURI; return connectURI;
} }
@JsonProperty
public String getUser() public String getUser()
{ {
return user; return user;
} }
@JsonProperty
public String getPassword() public String getPassword()
{ {
return passwordProvider == null ? null : passwordProvider.getPassword(); return passwordProvider == null ? null : passwordProvider.getPassword();
} }
@JsonProperty("dbcp")
public Properties getDbcpProperties() public Properties getDbcpProperties()
{ {
return dbcpProperties; return dbcpProperties;
@ -132,6 +138,7 @@ public class MetadataStorageConnectorConfig
: !getDbcpProperties().equals(that.getDbcpProperties())) { : !getDbcpProperties().equals(that.getDbcpProperties())) {
return false; return false;
} }
return passwordProvider != null ? passwordProvider.equals(that.passwordProvider) : that.passwordProvider == null; return passwordProvider != null ? passwordProvider.equals(that.passwordProvider) : that.passwordProvider == null;
} }

View File

@ -19,18 +19,33 @@
package org.apache.druid.utils; package org.apache.druid.utils;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.IAE;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Properties;
import java.util.Set; import java.util.Set;
public final class ConnectionUriUtils public final class ConnectionUriUtils
{ {
private static final String MARIADB_EXTRAS = "nonMappedOptions";
// Note: MySQL JDBC connector 8 supports 7 other protocols than just `jdbc:mysql:` // Note: MySQL JDBC connector 8 supports 7 other protocols than just `jdbc:mysql:`
// (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html). // (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html).
// We should consider either expanding recognized mysql protocols or restricting allowed protocols to // We should consider either expanding recognized mysql protocols or restricting allowed protocols to
// just a basic one. // just a basic one.
public static final String MYSQL_PREFIX = "jdbc:mysql:"; public static final String MYSQL_PREFIX = "jdbc:mysql:";
public static final String POSTGRES_PREFIX = "jdbc:postgresql:"; public static final String POSTGRES_PREFIX = "jdbc:postgresql:";
public static final String MARIADB_PREFIX = "jdbc:mariadb:";
public static final String POSTGRES_DRIVER = "org.postgresql.Driver";
public static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
public static final String MYSQL_NON_REGISTERING_DRIVER = "com.mysql.jdbc.NonRegisteringDriver";
public static final String MARIADB_DRIVER = "org.mariadb.jdbc.Driver";
/** /**
* This method checks {@param actualProperties} against {@param allowedProperties} if they are not system properties. * This method checks {@param actualProperties} against {@param allowedProperties} if they are not system properties.
@ -57,6 +72,211 @@ public final class ConnectionUriUtils
} }
} }
/**
* This method tries to determine the correct type of database for a given JDBC connection string URI, then load the
* driver using reflection to parse the uri parameters, returning the set of keys which can be used for JDBC
* parameter whitelist validation.
*
* uris starting with {@link #MYSQL_PREFIX} will first try to use the MySQL Connector/J driver (5.x), then fallback
* to MariaDB Connector/J (version 2.x) which also accepts jdbc:mysql uris. This method does not attempt to use
* MariaDB Connector/J 3.x alpha driver (at the time of these javadocs, it only handles the jdbc:mariadb prefix)
*
* uris starting with {@link #POSTGRES_PREFIX} will use the postgresql driver to parse the uri
*
* uris starting with {@link #MARIADB_PREFIX} will first try to use MariaDB Connector/J driver (2.x) then fallback to
* MariaDB Connector/J 3.x driver.
*
* If the uri does not match any of these schemes, this method will return an empty set if unknown uris are allowed,
* or throw an exception if not.
*/
public static Set<String> tryParseJdbcUriParameters(String connectionUri, boolean allowUnknown)
{
if (connectionUri.startsWith(MYSQL_PREFIX)) {
try {
return tryParseMySqlConnectionUri(connectionUri);
}
catch (ClassNotFoundException notFoundMysql) {
try {
return tryParseMariaDb2xConnectionUri(connectionUri);
}
catch (ClassNotFoundException notFoundMaria2x) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
notFoundMysql
);
}
catch (IllegalArgumentException iaeMaria2x) {
throw iaeMaria2x;
}
catch (Throwable otherMaria2x) {
throw new RuntimeException(otherMaria2x);
}
}
catch (IllegalArgumentException iaeMySql) {
throw iaeMySql;
}
catch (Throwable otherMysql) {
throw new RuntimeException(otherMysql);
}
} else if (connectionUri.startsWith(MARIADB_PREFIX)) {
try {
return tryParseMariaDb2xConnectionUri(connectionUri);
}
catch (ClassNotFoundException notFoundMaria2x) {
try {
return tryParseMariaDb3xConnectionUri(connectionUri);
}
catch (ClassNotFoundException notFoundMaria3x) {
throw new RuntimeException(
"Failed to find MariaDB driver class. Please check the MariaDB connector version 2.7.3 is in the classpath",
notFoundMaria2x
);
}
catch (IllegalArgumentException iaeMaria3x) {
throw iaeMaria3x;
}
catch (Throwable otherMaria3x) {
throw new RuntimeException(otherMaria3x);
}
}
catch (IllegalArgumentException iaeMaria2x) {
throw iaeMaria2x;
}
catch (Throwable otherMaria2x) {
throw new RuntimeException(otherMaria2x);
}
} else if (connectionUri.startsWith(POSTGRES_PREFIX)) {
try {
return tryParsePostgresConnectionUri(connectionUri);
}
catch (IllegalArgumentException iaePostgres) {
throw iaePostgres;
}
catch (Throwable otherPostgres) {
// no special handling for class not found because postgres driver is in distribution and should be available.
throw new RuntimeException(otherPostgres);
}
} else {
if (!allowUnknown) {
throw new IAE("Unknown JDBC connection scheme: %s", connectionUri.split(":")[1]);
}
return Collections.emptySet();
}
}
public static Set<String> tryParsePostgresConnectionUri(String connectionUri)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException
{
Class<?> driverClass = Class.forName(POSTGRES_DRIVER);
Method parseUrl = driverClass.getMethod("parseURL", String.class, Properties.class);
Properties properties = (Properties) parseUrl.invoke(null, connectionUri, null);
if (properties == null) {
throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectionUri);
}
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> keys.add((String) k));
return keys;
}
public static Set<String> tryParseMySqlConnectionUri(String connectionUri)
throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException,
InvocationTargetException
{
Class<?> driverClass = Class.forName(MYSQL_NON_REGISTERING_DRIVER);
Method parseUrl = driverClass.getMethod("parseURL", String.class, Properties.class);
// almost the same as postgres, but is an instance level method
Properties properties = (Properties) parseUrl.invoke(driverClass.getConstructor().newInstance(), connectionUri, null);
if (properties == null) {
throw new IAE("Invalid URL format for MySQL: [%s]", connectionUri);
}
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> keys.add((String) k));
return keys;
}
public static Set<String> tryParseMariaDb2xConnectionUri(String connectionUri)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
NoSuchFieldException, InstantiationException
{
// these are a bit more complicated
Class<?> urlParserClass = Class.forName("org.mariadb.jdbc.UrlParser");
Class<?> optionsClass = Class.forName("org.mariadb.jdbc.util.Options");
Method parseUrl = urlParserClass.getMethod("parse", String.class);
Method getOptions = urlParserClass.getMethod("getOptions");
Object urlParser = parseUrl.invoke(null, connectionUri);
if (urlParser == null) {
throw new IAE("Invalid URL format for MariaDB: [%s]", connectionUri);
}
Object options = getOptions.invoke(urlParser);
Field nonMappedOptionsField = optionsClass.getField(MARIADB_EXTRAS);
Properties properties = (Properties) nonMappedOptionsField.get(options);
Field[] fields = optionsClass.getDeclaredFields();
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size() + fields.length);
properties.forEach((k, v) -> keys.add((String) k));
Object defaultOptions = optionsClass.getConstructor().newInstance();
for (Field field : fields) {
if (field.getName().equals(MARIADB_EXTRAS)) {
continue;
}
try {
if (!Objects.equal(field.get(options), field.get(defaultOptions))) {
keys.add(field.getName());
}
}
catch (IllegalAccessException ignored) {
// ignore stuff we aren't allowed to read
}
}
return keys;
}
public static Set<String> tryParseMariaDb3xConnectionUri(String connectionUri)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
InstantiationException
{
Class<?> configurationClass = Class.forName("org.mariadb.jdbc.Configuration");
Class<?> configurationBuilderClass = Class.forName("org.mariadb.jdbc.Configuration$Builder");
Method parseUrl = configurationClass.getMethod("parse", String.class);
Method buildMethod = configurationBuilderClass.getMethod("build");
Object configuration = parseUrl.invoke(null, connectionUri);
if (configuration == null) {
throw new IAE("Invalid URL format for MariaDB: [%s]", connectionUri);
}
Method nonMappedOptionsGetter = configurationClass.getMethod(MARIADB_EXTRAS);
Properties properties = (Properties) nonMappedOptionsGetter.invoke(configuration);
Field[] fields = configurationClass.getDeclaredFields();
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size() + fields.length);
properties.forEach((k, v) -> keys.add((String) k));
Object defaultConfiguration = buildMethod.invoke(configurationBuilderClass.getConstructor().newInstance());
for (Field field : fields) {
if (field.getName().equals(MARIADB_EXTRAS)) {
continue;
}
try {
final Method fieldGetter = configurationClass.getMethod(field.getName());
if (!Objects.equal(fieldGetter.invoke(configuration), fieldGetter.invoke(defaultConfiguration))) {
keys.add(field.getName());
}
}
catch (IllegalAccessException | NoSuchMethodException ignored) {
// ignore stuff we aren't allowed to read
}
}
return keys;
}
private ConnectionUriUtils() private ConnectionUriUtils()
{ {
} }

View File

@ -20,17 +20,28 @@
package org.apache.druid.utils; package org.apache.druid.utils;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.runners.Enclosed; import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.util.Set;
@RunWith(Enclosed.class) @RunWith(Enclosed.class)
public class ConnectionUriUtilsTest public class ConnectionUriUtilsTest
{ {
public static class ThrowIfURLHasNotAllowedPropertiesTest public static class ThrowIfURLHasNotAllowedPropertiesTest
{ {
private static final String MYSQL_URI = "jdbc:mysql://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
private static final String MARIA_URI = "jdbc:mariadb://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
private static final String POSTGRES_URI = "jdbc:postgresql://localhost:3306/test?user=druid&password=diurd&keyonly&otherOptions=wat";
private static final String UNKNOWN_URI = "jdbc:druid://localhost:8888/query/v2/sql/avatica?user=druid&password=diurd&keyonly&otherOptions=wat";
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
@ -86,5 +97,180 @@ public class ConnectionUriUtilsTest
ImmutableSet.of("valid_key1", "valid_key2") ImmutableSet.of("valid_key1", "valid_key2")
); );
} }
@Test
public void testTryParses()
{
Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(POSTGRES_URI, false);
Assert.assertEquals(7, props.size());
props = ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
// though this would be 4 if mysql wasn't loaded in classpath because it would fall back to mariadb
Assert.assertEquals(9, props.size());
props = ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false);
Assert.assertEquals(4, props.size());
}
@Test
public void testTryParseUnknown()
{
Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(UNKNOWN_URI, true);
Assert.assertEquals(0, props.size());
expectedException.expect(IAE.class);
ConnectionUriUtils.tryParseJdbcUriParameters(UNKNOWN_URI, false);
}
@Test
public void tryParseInvalidPostgres()
{
expectedException.expect(IAE.class);
ConnectionUriUtils.tryParseJdbcUriParameters("jdbc:postgresql://bad:1234&param", true);
}
@Test
public void tryParseInvalidMySql()
{
expectedException.expect(IAE.class);
ConnectionUriUtils.tryParseJdbcUriParameters("jdbc:mysql:/bad", true);
}
@Test
public void testMySqlFallbackMySqlMaria2x()
{
MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false)).thenCallRealMethod();
utils.when(() -> ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI)).thenCallRealMethod();
Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
// this would be 9 if didn't fall back to mariadb
Assert.assertEquals(4, props.size());
utils.close();
}
@Test
public void testMariaFallbackMaria3x()
{
MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false)).thenCallRealMethod();
utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MARIA_URI)).thenThrow(ClassNotFoundException.class);
utils.when(() -> ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MARIA_URI)).thenCallRealMethod();
try {
Set<String> props = ConnectionUriUtils.tryParseJdbcUriParameters(MARIA_URI, false);
// this would be 4 if didn't fall back to mariadb 3x
Assert.assertEquals(8, props.size());
}
catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains("Failed to find MariaDB driver class"));
}
utils.close();
}
@Test
public void testMySqlFallbackMySqlNoDrivers()
{
MockedStatic<ConnectionUriUtils> utils = Mockito.mockStatic(ConnectionUriUtils.class);
utils.when(() -> ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false)).thenCallRealMethod();
utils.when(() -> ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
utils.when(() -> ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI)).thenThrow(ClassNotFoundException.class);
try {
ConnectionUriUtils.tryParseJdbcUriParameters(MYSQL_URI, false);
}
catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains("Failed to find MySQL driver class"));
}
utils.close();
}
@Test
public void testPosgresDriver() throws Exception
{
Set<String> props = ConnectionUriUtils.tryParsePostgresConnectionUri(POSTGRES_URI);
Assert.assertEquals(7, props.size());
// postgres adds a few extra system properties, PGDBNAME, PGHOST, PGPORT
Assert.assertTrue(props.contains("user"));
Assert.assertTrue(props.contains("password"));
Assert.assertTrue(props.contains("otherOptions"));
Assert.assertTrue(props.contains("keyonly"));
}
@Test
public void testMySQLDriver() throws Exception
{
Set<String> props = ConnectionUriUtils.tryParseMySqlConnectionUri(MYSQL_URI);
// mysql actually misses 'keyonly', but spits out several keys that are not actually uri parameters
// DBNAME, HOST, PORT, HOST.1, PORT.1, NUM_HOSTS
Assert.assertEquals(9, props.size());
Assert.assertTrue(props.contains("user"));
Assert.assertTrue(props.contains("password"));
Assert.assertTrue(props.contains("otherOptions"));
Assert.assertFalse(props.contains("keyonly"));
}
@Test
public void testMariaDb2xDriver() throws Throwable
{
Set<String> props = ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MYSQL_URI);
// mariadb doesn't spit out any extras other than what the user specified
Assert.assertEquals(4, props.size());
Assert.assertTrue(props.contains("user"));
Assert.assertTrue(props.contains("password"));
Assert.assertTrue(props.contains("otherOptions"));
Assert.assertTrue(props.contains("keyonly"));
props = ConnectionUriUtils.tryParseMariaDb2xConnectionUri(MARIA_URI);
Assert.assertEquals(4, props.size());
Assert.assertTrue(props.contains("user"));
Assert.assertTrue(props.contains("password"));
Assert.assertTrue(props.contains("otherOptions"));
Assert.assertTrue(props.contains("keyonly"));
}
@Test(expected = ClassNotFoundException.class)
public void testMariaDb3xDriver() throws Exception
{
// at the time of adding this test, mariadb connector/j 3.x does not actually parse jdbc:mysql uris
// so this would throw an IAE.class instead of ClassNotFoundException.class if the connector is swapped out
// in maven dependencies
ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MYSQL_URI);
}
@Test(expected = ClassNotFoundException.class)
public void testMariaDb3xDriverMariaUri() throws Exception
{
// mariadb 3.x driver cannot be loaded alongside 2.x, so this will fail with class not found
// however, if we swap out version in pom then we end up with 8 keys where
// "database", "addresses", "codecs", and "initialUrl" are added as extras
// we should perhaps consider adding them to built-in allowed lists in the future when this driver is no longer
// an alpha release
Set<String> props = ConnectionUriUtils.tryParseMariaDb3xConnectionUri(MARIA_URI);
Assert.assertEquals(8, props.size());
Assert.assertTrue(props.contains("user"));
Assert.assertTrue(props.contains("password"));
Assert.assertTrue(props.contains("otherOptions"));
Assert.assertTrue(props.contains("keyonly"));
}
@Test(expected = IAE.class)
public void testPostgresInvalidArgs() throws Exception
{
ConnectionUriUtils.tryParsePostgresConnectionUri(MYSQL_URI);
}
@Test(expected = IAE.class)
public void testMySqlInvalidArgs() throws Exception
{
ConnectionUriUtils.tryParseMySqlConnectionUri(POSTGRES_URI);
}
@Test(expected = IAE.class)
public void testMariaDbInvalidArgs() throws Exception
{
ConnectionUriUtils.tryParseMariaDb2xConnectionUri(POSTGRES_URI);
}
} }
} }

View File

@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
ARG DRUID_RELEASE
FROM $DRUID_RELEASE
WORKDIR /opt/druid/extensions/mysql-metadata-storage
ARG MARIA_URL=https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/2.7.3/mariadb-java-client-2.7.3.jar
ARG MARIA_JAR=mariadb-java-client-2.7.3.jar
ARG MARIA_SHA=4a2edc05bd882ad19371d2615c2635dccf8d74f0
RUN wget -q ${MARIA_URL} \
&& echo "${MARIA_SHA} ${MARIA_JAR}" | sha1sum -c \
&& ln -s ../extensions/mysql-metadata-storage/${MARIA_JAR} /opt/druid/lib
WORKDIR /opt/druid

View File

@ -110,16 +110,6 @@
<artifactId>jsr311-api</artifactId> <artifactId>jsr311-api</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>
@ -177,5 +167,17 @@
<artifactId>powermock-api-easymock</artifactId> <artifactId>powermock-api-easymock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -24,23 +24,15 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.ConnectionUriUtils; import org.apache.druid.utils.ConnectionUriUtils;
import org.apache.druid.utils.Throwables;
import org.joda.time.Period; import org.joda.time.Period;
import org.postgresql.Driver;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.validation.constraints.Min; import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.sql.SQLException;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import java.util.Set;
/** /**
* *
@ -92,13 +84,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
/** /**
* Check the given URL whether it contains non-allowed properties. * Check the given URL whether it contains non-allowed properties.
* *
* This method should be in sync with the following methods:
*
* - {@code org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()}
* - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
* - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
*
* @see JdbcAccessSecurityConfig#getAllowedProperties() * @see JdbcAccessSecurityConfig#getAllowedProperties()
* @see ConnectionUriUtils#tryParseJdbcUriParameters(String, boolean)
*/ */
private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig) private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
{ {
@ -109,64 +96,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
return; return;
} }
@Nullable final Properties properties; // null when url has an invalid format
if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(url, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
try {
properties = Driver.parseURL(url, null);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("org/postgresql/Driver")) {
throw new RuntimeException(
"Failed to find PostgreSQL driver class. "
+ "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else {
if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
properties = new Properties();
} else {
// unknown format but it is not allowed
throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
}
}
if (properties == null) {
// There is something wrong with the URL format.
throw new IAE("Invalid URL format [%s]", url);
}
final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> propertyKeys.add((String) k));
ConnectionUriUtils.throwIfPropertiesAreNotAllowed( ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeys, ConnectionUriUtils.tryParseJdbcUriParameters(url, securityConfig.isAllowUnknownJdbcUrlFormat()),
securityConfig.getSystemPropertyPrefixes(), securityConfig.getSystemPropertyPrefixes(),
securityConfig.getAllowedProperties() securityConfig.getAllowedProperties()
); );

View File

@ -155,7 +155,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
public void testWhenInvalidUrlFormat() public void testWhenInvalidUrlFormat()
{ {
expectedException.expect(IllegalArgumentException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]"); expectedException.expectMessage("Invalid URL format for MySQL: [jdbc:mysql:/invalid-url::3006]");
new JdbcExtractionNamespace( new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig() new MetadataStorageConnectorConfig()
{ {
@ -305,7 +305,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
public void testWhenInvalidUrlFormat() public void testWhenInvalidUrlFormat()
{ {
expectedException.expect(IllegalArgumentException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]"); expectedException.expectMessage("Invalid URL format for PostgreSQL: [jdbc:postgresql://invalid-url::3006]");
new JdbcExtractionNamespace( new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig() new MetadataStorageConnectorConfig()
{ {

View File

@ -96,16 +96,6 @@
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>
@ -139,6 +129,17 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -23,10 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -34,8 +31,6 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.DataFetcher; import org.apache.druid.server.lookup.DataFetcher;
import org.apache.druid.utils.ConnectionUriUtils; import org.apache.druid.utils.ConnectionUriUtils;
import org.apache.druid.utils.Throwables;
import org.postgresql.Driver;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
@ -47,8 +42,6 @@ import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier; import java.util.function.Supplier;
public class JdbcDataFetcher implements DataFetcher<String, String> public class JdbcDataFetcher implements DataFetcher<String, String>
@ -124,13 +117,8 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
/** /**
* Check the given URL whether it contains non-allowed properties. * Check the given URL whether it contains non-allowed properties.
* *
* This method should be in sync with the following methods:
*
* - {@code org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()}
* - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
* - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
*
* @see JdbcAccessSecurityConfig#getAllowedProperties() * @see JdbcAccessSecurityConfig#getAllowedProperties()
* @see ConnectionUriUtils#tryParseJdbcUriParameters(String, boolean)
*/ */
private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig) private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
{ {
@ -141,64 +129,8 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
return; return;
} }
@Nullable final Properties properties;
if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(url, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
try {
properties = Driver.parseURL(url, null);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("org/postgresql/Driver")) {
throw new RuntimeException(
"Failed to find PostgreSQL driver class. "
+ "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else {
if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
properties = new Properties();
} else {
// unknown format but it is not allowed
throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
}
}
if (properties == null) {
// There is something wrong with the URL format.
throw new IAE("Invalid URL format [%s]", url);
}
final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> propertyKeys.add((String) k));
ConnectionUriUtils.throwIfPropertiesAreNotAllowed( ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeys, ConnectionUriUtils.tryParseJdbcUriParameters(url, securityConfig.isAllowUnknownJdbcUrlFormat()),
securityConfig.getSystemPropertyPrefixes(), securityConfig.getSystemPropertyPrefixes(),
securityConfig.getAllowedProperties() securityConfig.getAllowedProperties()
); );

View File

@ -147,7 +147,7 @@ public class JdbcDataFetcherUrlCheckTest
public void testWhenInvalidUrlFormat() public void testWhenInvalidUrlFormat()
{ {
expectedException.expect(IllegalArgumentException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]"); expectedException.expectMessage("Invalid URL format for MySQL: [jdbc:mysql:/invalid-url::3006]");
new JdbcDataFetcher( new JdbcDataFetcher(
new MetadataStorageConnectorConfig() new MetadataStorageConnectorConfig()
{ {
@ -289,7 +289,7 @@ public class JdbcDataFetcherUrlCheckTest
public void testWhenInvalidUrlFormat() public void testWhenInvalidUrlFormat()
{ {
expectedException.expect(IllegalArgumentException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]"); expectedException.expectMessage("Invalid URL format for PostgreSQL: [jdbc:postgresql://invalid-url::3006]");
new JdbcDataFetcher( new JdbcDataFetcher(
new MetadataStorageConnectorConfig() new MetadataStorageConnectorConfig()
{ {

View File

@ -88,11 +88,39 @@
<artifactId>commons-dbcp2</artifactId> <artifactId>commons-dbcp2</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -23,18 +23,15 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.Throwables; import org.apache.druid.utils.ConnectionUriUtils;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import java.sql.SQLException; import javax.annotation.Nullable;
import java.util.Properties; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -43,17 +40,27 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
{ {
private final DBI dbi; private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig; private final MetadataStorageConnectorConfig connectorConfig;
@Nullable
private final String driverClassName;
@JsonCreator @JsonCreator
public MySQLFirehoseDatabaseConnector( public MySQLFirehoseDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig, @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
@JsonProperty("driverClassName") @Nullable String driverClassName,
@JacksonInject JdbcAccessSecurityConfig securityConfig @JacksonInject JdbcAccessSecurityConfig securityConfig
) )
{ {
this.connectorConfig = connectorConfig; this.connectorConfig = connectorConfig;
this.driverClassName = driverClassName;
final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig); final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
datasource.setDriverClassLoader(getClass().getClassLoader()); datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("com.mysql.jdbc.Driver"); if (driverClassName != null) {
datasource.setDriverClassName(driverClassName);
} else if (connectorConfig.getConnectURI().startsWith(ConnectionUriUtils.MARIADB_PREFIX)) {
datasource.setDriverClassName(ConnectionUriUtils.MARIADB_DRIVER);
} else {
datasource.setDriverClassName(ConnectionUriUtils.MYSQL_DRIVER);
}
this.dbi = new DBI(datasource); this.dbi = new DBI(datasource);
} }
@ -63,6 +70,13 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
return connectorConfig; return connectorConfig;
} }
@Nullable
@JsonProperty
public String getDriverClassName()
{
return driverClassName;
}
@Override @Override
public DBI getDBI() public DBI getDBI()
{ {
@ -70,37 +84,30 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
} }
@Override @Override
public Set<String> findPropertyKeysFromConnectURL(String connectUrl) public Set<String> findPropertyKeysFromConnectURL(String connectUrl, boolean allowUnknown)
{ {
// This method should be in sync with return ConnectionUriUtils.tryParseJdbcUriParameters(connectUrl, allowUnknown);
// - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL() }
// - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
Properties properties;
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(connectUrl, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
if (properties == null) { @Override
throw new IAE("Invalid URL format for MySQL: [%s]", connectUrl); public boolean equals(Object o)
{
if (this == o) {
return true;
} }
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size()); if (o == null || getClass() != o.getClass()) {
properties.forEach((k, v) -> keys.add((String) k)); return false;
return keys; }
MySQLFirehoseDatabaseConnector that = (MySQLFirehoseDatabaseConnector) o;
return connectorConfig.equals(that.connectorConfig) && Objects.equals(
driverClassName,
that.driverClassName
);
}
@Override
public int hashCode()
{
return Objects.hash(connectorConfig, driverClassName);
} }
} }

View File

@ -33,7 +33,6 @@ import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.metadata.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.StringMapper; import org.skife.jdbi.v2.util.StringMapper;
import java.io.File; import java.io.File;
@ -46,7 +45,6 @@ public class MySQLConnector extends SQLMetadataConnector
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT"; private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private static final String QUOTE_STRING = "`"; private static final String QUOTE_STRING = "`";
private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin"; private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private static final String MYSQL_JDBC_DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver";
private final DBI dbi; private final DBI dbi;
@ -54,20 +52,20 @@ public class MySQLConnector extends SQLMetadataConnector
public MySQLConnector( public MySQLConnector(
Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables, Supplier<MetadataStorageTablesConfig> dbTables,
MySQLConnectorConfig connectorConfig MySQLConnectorSslConfig connectorSslConfig,
MySQLConnectorDriverConfig driverConfig
) )
{ {
super(config, dbTables); super(config, dbTables);
try { try {
Class.forName(MYSQL_JDBC_DRIVER_CLASS_NAME, false, getClass().getClassLoader()); Class.forName(driverConfig.getDriverClassName(), false, getClass().getClassLoader());
} }
catch (ClassNotFoundException e) { catch (ClassNotFoundException e) {
throw new ISE(e, "Could not find %s on the classpath. The MySQL Connector library is not included in the Druid " throw new ISE(e, "Could not find %s on the classpath. The MySQL Connector library is not included in the Druid "
+ "distribution but is required to use MySQL. Please download a compatible library (for example " + "distribution but is required to use MySQL. Please download a compatible library (for example "
+ "'mysql-connector-java-5.1.48.jar') and place it under 'extensions/mysql-metadata-storage/'. See " + "'mysql-connector-java-5.1.48.jar') and place it under 'extensions/mysql-metadata-storage/'. See "
+ "https://druid.apache.org/downloads for more details.", + "https://druid.apache.org/downloads for more details.",
MYSQL_JDBC_DRIVER_CLASS_NAME driverConfig.getDriverClassName()
); );
} }
@ -75,67 +73,67 @@ public class MySQLConnector extends SQLMetadataConnector
// MySQL driver is classloader isolated as part of the extension // MySQL driver is classloader isolated as part of the extension
// so we need to help JDBC find the driver // so we need to help JDBC find the driver
datasource.setDriverClassLoader(getClass().getClassLoader()); datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName(MYSQL_JDBC_DRIVER_CLASS_NAME); datasource.setDriverClassName(driverConfig.getDriverClassName());
datasource.addConnectionProperty("useSSL", String.valueOf(connectorConfig.isUseSSL())); datasource.addConnectionProperty("useSSL", String.valueOf(connectorSslConfig.isUseSSL()));
if (connectorConfig.isUseSSL()) { if (connectorSslConfig.isUseSSL()) {
log.info("SSL is enabled on this MySQL connection. "); log.info("SSL is enabled on this MySQL connection. ");
datasource.addConnectionProperty( datasource.addConnectionProperty(
"verifyServerCertificate", "verifyServerCertificate",
String.valueOf(connectorConfig.isVerifyServerCertificate()) String.valueOf(connectorSslConfig.isVerifyServerCertificate())
); );
if (connectorConfig.isVerifyServerCertificate()) { if (connectorSslConfig.isVerifyServerCertificate()) {
log.info("Server certificate verification is enabled. "); log.info("Server certificate verification is enabled. ");
if (connectorConfig.getTrustCertificateKeyStoreUrl() != null) { if (connectorSslConfig.getTrustCertificateKeyStoreUrl() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"trustCertificateKeyStoreUrl", "trustCertificateKeyStoreUrl",
new File(connectorConfig.getTrustCertificateKeyStoreUrl()).toURI().toString() new File(connectorSslConfig.getTrustCertificateKeyStoreUrl()).toURI().toString()
); );
} }
if (connectorConfig.getTrustCertificateKeyStoreType() != null) { if (connectorSslConfig.getTrustCertificateKeyStoreType() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"trustCertificateKeyStoreType", "trustCertificateKeyStoreType",
connectorConfig.getTrustCertificateKeyStoreType() connectorSslConfig.getTrustCertificateKeyStoreType()
); );
} }
if (connectorConfig.getTrustCertificateKeyStorePassword() == null) { if (connectorSslConfig.getTrustCertificateKeyStorePassword() == null) {
log.warn( log.warn(
"Trust store password is empty. Ensure that the trust store has been configured with an empty password."); "Trust store password is empty. Ensure that the trust store has been configured with an empty password.");
} else { } else {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"trustCertificateKeyStorePassword", "trustCertificateKeyStorePassword",
connectorConfig.getTrustCertificateKeyStorePassword() connectorSslConfig.getTrustCertificateKeyStorePassword()
); );
} }
} }
if (connectorConfig.getClientCertificateKeyStoreUrl() != null) { if (connectorSslConfig.getClientCertificateKeyStoreUrl() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"clientCertificateKeyStoreUrl", "clientCertificateKeyStoreUrl",
new File(connectorConfig.getClientCertificateKeyStoreUrl()).toURI().toString() new File(connectorSslConfig.getClientCertificateKeyStoreUrl()).toURI().toString()
); );
} }
if (connectorConfig.getClientCertificateKeyStoreType() != null) { if (connectorSslConfig.getClientCertificateKeyStoreType() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"clientCertificateKeyStoreType", "clientCertificateKeyStoreType",
connectorConfig.getClientCertificateKeyStoreType() connectorSslConfig.getClientCertificateKeyStoreType()
); );
} }
if (connectorConfig.getClientCertificateKeyStorePassword() != null) { if (connectorSslConfig.getClientCertificateKeyStorePassword() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"clientCertificateKeyStorePassword", "clientCertificateKeyStorePassword",
connectorConfig.getClientCertificateKeyStorePassword() connectorSslConfig.getClientCertificateKeyStorePassword()
); );
} }
Joiner joiner = Joiner.on(",").skipNulls(); Joiner joiner = Joiner.on(",").skipNulls();
if (connectorConfig.getEnabledSSLCipherSuites() != null) { if (connectorSslConfig.getEnabledSSLCipherSuites() != null) {
datasource.addConnectionProperty( datasource.addConnectionProperty(
"enabledSSLCipherSuites", "enabledSSLCipherSuites",
joiner.join(connectorConfig.getEnabledSSLCipherSuites()) joiner.join(connectorSslConfig.getEnabledSSLCipherSuites())
); );
} }
if (connectorConfig.getEnabledTLSProtocols() != null) { if (connectorSslConfig.getEnabledTLSProtocols() != null) {
datasource.addConnectionProperty("enabledTLSProtocols", joiner.join(connectorConfig.getEnabledTLSProtocols())); datasource.addConnectionProperty("enabledTLSProtocols", joiner.join(connectorSslConfig.getEnabledTLSProtocols()));
} }
} }
@ -220,24 +218,19 @@ public class MySQLConnector extends SQLMetadataConnector
) )
{ {
return getDBI().withHandle( return getDBI().withHandle(
new HandleCallback<Void>() handle -> {
{ handle.createStatement(
@Override StringUtils.format(
public Void withHandle(Handle handle) "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
{ tableName,
handle.createStatement( keyColumn,
StringUtils.format( valueColumn
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value", )
tableName, )
keyColumn, .bind("key", key)
valueColumn .bind("value", value)
) .execute();
) return null;
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
} }
); );
} }

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata.storage.mysql;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class MySQLConnectorDriverConfig
{
@JsonProperty
private String driverClassName = "com.mysql.jdbc.Driver";
@JsonProperty
public String getDriverClassName()
{
return driverClassName;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MySQLConnectorDriverConfig that = (MySQLConnectorDriverConfig) o;
return driverClassName.equals(that.driverClassName);
}
@Override
public int hashCode()
{
return Objects.hash(driverClassName);
}
@Override
public String toString()
{
return "MySQLConnectorDriverConfig{" +
"driverClassName='" + driverClassName + '\'' +
'}';
}
}

View File

@ -24,7 +24,7 @@ import org.apache.druid.metadata.PasswordProvider;
import java.util.List; import java.util.List;
public class MySQLConnectorConfig public class MySQLConnectorSslConfig
{ {
@JsonProperty @JsonProperty
private boolean useSSL = false; private boolean useSSL = false;
@ -109,7 +109,7 @@ public class MySQLConnectorConfig
@Override @Override
public String toString() public String toString()
{ {
return "MySQLConnectorConfig{" + return "MySQLConnectorSslConfig{" +
"useSSL='" + useSSL + '\'' + "useSSL='" + useSSL + '\'' +
", clientCertificateKeyStoreUrl='" + clientCertificateKeyStoreUrl + '\'' + ", clientCertificateKeyStoreUrl='" + clientCertificateKeyStoreUrl + '\'' +
", clientCertificateKeyStoreType='" + clientCertificateKeyStoreType + '\'' + ", clientCertificateKeyStoreType='" + clientCertificateKeyStoreType + '\'' +

View File

@ -65,7 +65,8 @@ public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule im
{ {
super.configure(binder); super.configure(binder);
JsonConfigProvider.bind(binder, "druid.metadata.mysql.ssl", MySQLConnectorConfig.class); JsonConfigProvider.bind(binder, "druid.metadata.mysql.ssl", MySQLConnectorSslConfig.class);
JsonConfigProvider.bind(binder, "druid.metadata.mysql.driver", MySQLConnectorDriverConfig.class);
PolyBind PolyBind
.optionBinder(binder, Key.get(MetadataStorageProvider.class)) .optionBinder(binder, Key.get(MetadataStorageProvider.class))

View File

@ -19,10 +19,17 @@
package org.apache.druid.firehose.sql; package org.apache.druid.firehose.sql;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.storage.mysql.MySQLMetadataStorageModule;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -31,9 +38,56 @@ import java.util.Set;
public class MySQLFirehoseDatabaseConnectorTest public class MySQLFirehoseDatabaseConnectorTest
{ {
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
static {
MAPPER.registerModules(new MySQLMetadataStorageModule().getJacksonModules());
MAPPER.setInjectableValues(new InjectableValues.Std().addValue(JdbcAccessSecurityConfig.class, INJECTED_CONF));
}
@Rule @Rule
public final ExpectedException expectedException = ExpectedException.none(); public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws JsonProcessingException
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test";
}
};
MySQLFirehoseDatabaseConnector connector = new MySQLFirehoseDatabaseConnector(
connectorConfig,
null,
INJECTED_CONF
);
MySQLFirehoseDatabaseConnector andBack = MAPPER.readValue(MAPPER.writeValueAsString(connector), MySQLFirehoseDatabaseConnector.class);
Assert.assertEquals(connector, andBack);
// test again with classname
connector = new MySQLFirehoseDatabaseConnector(
connectorConfig,
"some.class.name.Driver",
INJECTED_CONF
);
andBack = MAPPER.readValue(MAPPER.writeValueAsString(connector), MySQLFirehoseDatabaseConnector.class);
Assert.assertEquals(connector, andBack);
}
@Test
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(MySQLFirehoseDatabaseConnector.class)
.usingGetClass()
.withNonnullFields("connectorConfig")
.withIgnoredFields("dbi")
.verify();
}
@Test @Test
public void testSuccessWhenNoPropertyInUriAndNoAllowlist() public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
{ {
@ -50,6 +104,7 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -70,6 +125,7 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -93,6 +149,7 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -115,10 +172,36 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@Test
public void testSuccessOnlyValidPropertyMariaDb()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mariadb://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
ImmutableSet.of("user", "password", "keyonly", "etc")
);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
null,
securityConfig
);
}
@Test @Test
public void testFailOnlyInvalidProperty() public void testFailOnlyInvalidProperty()
{ {
@ -138,6 +221,7 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -161,6 +245,31 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig
);
}
@Test
public void testFailValidAndInvalidPropertyMariadb()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mariadb://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -194,6 +303,7 @@ public class MySQLFirehoseDatabaseConnectorTest
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
securityConfig securityConfig
); );
} }
@ -211,10 +321,11 @@ public class MySQLFirehoseDatabaseConnectorTest
} }
}; };
expectedException.expect(IllegalArgumentException.class); expectedException.expect(RuntimeException.class);
expectedException.expectMessage(StringUtils.format("Invalid URL format for MySQL: [%s]", url)); expectedException.expectMessage(StringUtils.format("Invalid URL format for MySQL: [%s]", url));
new MySQLFirehoseDatabaseConnector( new MySQLFirehoseDatabaseConnector(
connectorConfig, connectorConfig,
null,
new JdbcAccessSecurityConfig() new JdbcAccessSecurityConfig()
); );
} }

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata.storage.mysql;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class MySQLConnectorDriverConfigTest
{
@Test
public void testEqualsAndHashcode()
{
EqualsVerifier.simple()
.forClass(MySQLConnectorDriverConfig.class)
.usingGetClass()
.withNonnullFields("driverClassName")
.verify();
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata.storage.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.MetadataConfigModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class MySQLMetadataStorageModuleTest
{
@Test
public void testSslConfig()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.metadata.mysql.ssl";
final JsonConfigProvider<MySQLConnectorSslConfig> provider = JsonConfigProvider.of(
propertyPrefix,
MySQLConnectorSslConfig.class
);
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".useSSL", "true");
properties.setProperty(propertyPrefix + ".trustCertificateKeyStoreUrl", "url");
properties.setProperty(propertyPrefix + ".trustCertificateKeyStoreType", "type");
properties.setProperty(propertyPrefix + ".trustCertificateKeyStorePassword", "secret");
properties.setProperty(propertyPrefix + ".clientCertificateKeyStoreUrl", "url");
properties.setProperty(propertyPrefix + ".clientCertificateKeyStoreType", "type");
properties.setProperty(propertyPrefix + ".clientCertificateKeyStorePassword", "secret");
properties.setProperty(propertyPrefix + ".enabledSSLCipherSuites", "[\"some\", \"ciphers\"]");
properties.setProperty(propertyPrefix + ".enabledTLSProtocols", "[\"some\", \"protocols\"]");
properties.setProperty(propertyPrefix + ".verifyServerCertificate", "true");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final MySQLConnectorSslConfig config = provider.get().get();
Assert.assertTrue(config.isUseSSL());
Assert.assertEquals("url", config.getTrustCertificateKeyStoreUrl());
Assert.assertEquals("type", config.getTrustCertificateKeyStoreType());
Assert.assertEquals("secret", config.getTrustCertificateKeyStorePassword());
Assert.assertEquals("url", config.getClientCertificateKeyStoreUrl());
Assert.assertEquals("type", config.getClientCertificateKeyStoreType());
Assert.assertEquals("secret", config.getClientCertificateKeyStorePassword());
Assert.assertEquals(ImmutableList.of("some", "ciphers"), config.getEnabledSSLCipherSuites());
Assert.assertEquals(ImmutableList.of("some", "protocols"), config.getEnabledTLSProtocols());
Assert.assertTrue(config.isVerifyServerCertificate());
}
@Test
public void testDriverConfigDefault()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.metadata.mysql.driver";
final JsonConfigProvider<MySQLConnectorDriverConfig> provider = JsonConfigProvider.of(
propertyPrefix,
MySQLConnectorDriverConfig.class
);
final Properties properties = new Properties();
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final MySQLConnectorDriverConfig config = provider.get().get();
Assert.assertEquals(new MySQLConnectorDriverConfig().getDriverClassName(), config.getDriverClassName());
}
@Test
public void testDriverConfig()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.metadata.mysql.driver";
final JsonConfigProvider<MySQLConnectorDriverConfig> provider = JsonConfigProvider.of(
propertyPrefix,
MySQLConnectorDriverConfig.class
);
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".driverClassName", "some.driver.classname");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final MySQLConnectorDriverConfig config = provider.get().get();
Assert.assertEquals("some.driver.classname", config.getDriverClassName());
}
private Injector createInjector()
{
MySQLMetadataStorageModule module = new MySQLMetadataStorageModule();
Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new MetadataConfigModule(),
new LifecycleModule(),
module,
new Module()
{
@Override
public void configure(Binder binder)
{
module.createBindingChoices(binder, "mysql");
}
@Provides
public ServiceEmitter getEmitter()
{
return new ServiceEmitter("test", "localhost", new NoopEmitter());
}
}
)
);
ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
mapper.registerModules(module.getJacksonModules());
return injector;
}
}

View File

@ -86,12 +86,34 @@
<artifactId>commons-dbcp2</artifactId> <artifactId>commons-dbcp2</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -23,16 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.postgresql.Driver; import org.apache.druid.utils.ConnectionUriUtils;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import java.util.Properties; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -68,19 +66,35 @@ public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConn
} }
@Override @Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri) public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
{ {
// This method should be in sync with return ConnectionUriUtils.tryParseJdbcUriParameters(connectUri, allowUnknown);
// - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL() }
// - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
// Postgresql JDBC driver is embedded and thus must be loaded. @Override
Properties properties = Driver.parseURL(connectUri, null); public boolean equals(Object o)
if (properties == null) { {
throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectUri); if (this == o) {
return true;
} }
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size()); if (o == null || getClass() != o.getClass()) {
properties.forEach((k, v) -> keys.add((String) k)); return false;
return keys; }
PostgresqlFirehoseDatabaseConnector that = (PostgresqlFirehoseDatabaseConnector) o;
return connectorConfig.equals(that.connectorConfig);
}
@Override
public int hashCode()
{
return Objects.hash(connectorConfig);
}
@Override
public String toString()
{
return "PostgresqlFirehoseDatabaseConnector{" +
"connectorConfig=" + connectorConfig +
'}';
} }
} }

View File

@ -19,9 +19,16 @@
package org.apache.druid.firehose; package org.apache.druid.firehose;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig; import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -30,9 +37,50 @@ import java.util.Set;
public class PostgresqlFirehoseDatabaseConnectorTest public class PostgresqlFirehoseDatabaseConnectorTest
{ {
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
static {
MAPPER.registerModules(new PostgreSQLMetadataStorageModule().getJacksonModules());
MAPPER.setInjectableValues(new InjectableValues.Std().addValue(JdbcAccessSecurityConfig.class, INJECTED_CONF));
}
@Rule @Rule
public final ExpectedException expectedException = ExpectedException.none(); public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws JsonProcessingException
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test";
}
};
PostgresqlFirehoseDatabaseConnector connector = new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
INJECTED_CONF
);
PostgresqlFirehoseDatabaseConnector andBack = MAPPER.readValue(
MAPPER.writeValueAsString(connector),
PostgresqlFirehoseDatabaseConnector.class
);
Assert.assertEquals(connector, andBack);
}
@Test
public void testEqualsAndHashcode()
{
EqualsVerifier.forClass(PostgresqlFirehoseDatabaseConnector.class)
.usingGetClass()
.withNonnullFields("connectorConfig")
.withIgnoredFields("dbi")
.verify();
}
@Test @Test
public void testSuccessWhenNoPropertyInUriAndNoAllowlist() public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
{ {

View File

@ -29,6 +29,8 @@ RUN APACHE_ARCHIVE_MIRROR_HOST=${APACHE_ARCHIVE_MIRROR_HOST} /root/base-setup.sh
FROM druidbase FROM druidbase
ARG MYSQL_VERSION ARG MYSQL_VERSION
ARG MARIA_VERSION
ARG MYSQL_DRIVER_CLASSNAME=com.mysql.jdbc.Driver
ARG CONFLUENT_VERSION ARG CONFLUENT_VERSION
# Verify Java version # Verify Java version
@ -47,16 +49,23 @@ ADD lib/* /usr/local/druid/lib/
# Download the MySQL Java connector # Download the MySQL Java connector
# target path must match the exact path referenced in environment-configs/common # target path must match the exact path referenced in environment-configs/common
RUN wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \ # alternatively: Download the MariaDB Java connector, and pretend it is the mysql connector
-O /usr/local/druid/lib/mysql-connector-java.jar RUN if [ "$MYSQL_DRIVER_CLASSNAME" = "com.mysql.jdbc.Driver" ] ; \
then wget -q "https://repo1.maven.org/maven2/mysql/mysql-connector-java/$MYSQL_VERSION/mysql-connector-java-$MYSQL_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar; \
elif [ "$MYSQL_DRIVER_CLASSNAME" = "org.mariadb.jdbc.Driver" ] ; \
then wget -q "https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/$MARIA_VERSION/mariadb-java-client-$MARIA_VERSION.jar" \
-O /usr/local/druid/lib/mysql-connector-java.jar; \
fi
# download kafka protobuf provider
RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \ RUN wget -q "https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/$CONFLUENT_VERSION/kafka-protobuf-provider-$CONFLUENT_VERSION.jar" \
-O /usr/local/druid/lib/kafka-protobuf-provider.jar -O /usr/local/druid/lib/kafka-protobuf-provider.jar
# Add sample data # Add sample data
# touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72.
RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ RUN find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \ && java -cp "/usr/local/druid/lib/*" -Ddruid.metadata.storage.type=mysql -Ddruid.metadata.mysql.driver.driverClassName=$MYSQL_DRIVER_CLASSNAME org.apache.druid.cli.Main tools metadata-init --connectURI="jdbc:mysql://localhost:3306/druid" --user=druid --password=diurd \
&& /etc/init.d/mysql stop && /etc/init.d/mysql stop
ADD test-data /test-data ADD test-data /test-data
@ -105,8 +114,9 @@ EXPOSE 8100 8101 8102 8103 8104 8105
EXPOSE 8300 8301 8302 8303 8304 8305 EXPOSE 8300 8301 8302 8303 8304 8305
EXPOSE 9092 9093 EXPOSE 9092 9093
ENV MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME
WORKDIR /var/lib/druid WORKDIR /var/lib/druid
ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \ ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
&& . /druid.sh \ && . /druid.sh \
# Create druid service config files with all the config variables # Create druid service config files with all the config variables
&& setupConfig \ && setupConfig \

View File

@ -101,6 +101,9 @@ setupData()
export AWS_REGION=us-east-1 export AWS_REGION=us-east-1
fi fi
if [ "$MYSQL_DRIVER_CLASSNAME" != "com.mysql.jdbc.Driver" ] ; then
setKey $DRUID_SERVICE druid.metadata.mysql.driver.driverClassName $MYSQL_DRIVER_CLASSNAME
fi
# The SqlInputSource tests in the "input-source" test group require data to be setup in MySQL before running the tests. # The SqlInputSource tests in the "input-source" test group require data to be setup in MySQL before running the tests.
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "input-source" ] ; then if [ "$DRUID_INTEGRATION_TEST_GROUP" = "input-source" ] ; then

View File

@ -491,6 +491,7 @@
<DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER> <DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>${docker.run.skip}</DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER>
<DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER> <DRUID_INTEGRATION_TEST_INDEXER>${it.indexer}</DRUID_INTEGRATION_TEST_INDEXER>
<MYSQL_VERSION>${mysql.version}</MYSQL_VERSION> <MYSQL_VERSION>${mysql.version}</MYSQL_VERSION>
<MARIA_VERSION>2.7.3</MARIA_VERSION>
<CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION> <CONFLUENT_VERSION>5.5.1</CONFLUENT_VERSION>
<KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION> <KAFKA_VERSION>${apache.kafka.version}</KAFKA_VERSION>
<ZK_VERSION>${zookeeper.version}</ZK_VERSION> <ZK_VERSION>${zookeeper.version}</ZK_VERSION>

View File

@ -22,21 +22,21 @@ set -e
if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ]
then then
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version" echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is not set. Building druid-cluster with default Java version"
docker build -t druid/cluster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION $SHARED_DIR/docker docker build -t druid/cluster --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME $SHARED_DIR/docker
else else
echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" echo "\$DRUID_INTEGRATION_TEST_JVM_RUNTIME is set with value ${DRUID_INTEGRATION_TEST_JVM_RUNTIME}"
case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in case "${DRUID_INTEGRATION_TEST_JVM_RUNTIME}" in
8) 8)
echo "Build druid-cluster with Java 8" echo "Build druid-cluster with Java 8"
docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker docker build -t druid/cluster --build-arg JDK_VERSION=8-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;; ;;
11) 11)
echo "Build druid-cluster with Java 11" echo "Build druid-cluster with Java 11"
docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker docker build -t druid/cluster --build-arg JDK_VERSION=11-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg MYSQL_DRIVER_CLASSNAME --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;; ;;
15) 15)
echo "Build druid-cluster with Java 15" echo "Build druid-cluster with Java 15"
docker build -t druid/cluster --build-arg JDK_VERSION=15-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker docker build -t druid/cluster --build-arg JDK_VERSION=15-slim --build-arg ZK_VERSION --build-arg KAFKA_VERSION --build-arg CONFLUENT_VERSION --build-arg MYSQL_VERSION --build-arg MARIA_VERSION --build-arg USE_MARIA --build-arg APACHE_ARCHIVE_MIRROR_HOST $SHARED_DIR/docker
;; ;;
*) *)
echo "Invalid JVM Runtime given. Stopping" echo "Invalid JVM Runtime given. Stopping"

View File

@ -98,6 +98,7 @@
<codehaus.jackson.version>1.9.13</codehaus.jackson.version> <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<log4j.version>2.8.2</log4j.version> <log4j.version>2.8.2</log4j.version>
<mysql.version>5.1.48</mysql.version> <mysql.version>5.1.48</mysql.version>
<mariadb.version>2.7.3</mariadb.version>
<netty3.version>3.10.6.Final</netty3.version> <netty3.version>3.10.6.Final</netty3.version>
<netty4.version>4.1.63.Final</netty4.version> <netty4.version>4.1.63.Final</netty4.version>
<postgresql.version>42.2.14</postgresql.version> <postgresql.version>42.2.14</postgresql.version>

View File

@ -95,7 +95,7 @@ public abstract class SQLFirehoseDatabaseConnector
// You don't want to do anything with properties. // You don't want to do anything with properties.
return; return;
} }
final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString); final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString, securityConfig.isAllowUnknownJdbcUrlFormat());
ConnectionUriUtils.throwIfPropertiesAreNotAllowed( ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeyFromConnectURL, propertyKeyFromConnectURL,
securityConfig.getSystemPropertyPrefixes(), securityConfig.getSystemPropertyPrefixes(),
@ -113,5 +113,5 @@ public abstract class SQLFirehoseDatabaseConnector
/** /**
* Extract property keys from the given JDBC URL. * Extract property keys from the given JDBC URL.
*/ */
public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri); public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown);
} }

View File

@ -298,7 +298,7 @@ public class SqlInputSourceTest
} }
@Override @Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri) public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
{ {
return ImmutableSet.of("user", "create"); return ImmutableSet.of("user", "create");
} }

View File

@ -82,7 +82,7 @@ public class SqlTestUtils
} }
@Override @Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri) public Set<String> findPropertyKeysFromConnectURL(String connectUri, boolean allowUnknown)
{ {
return ImmutableSet.of("user", "create"); return ImmutableSet.of("user", "create");
} }