Allow list for JDBC connection properties to address CVE-2021-26919 (#11047)

* Allow list for JDBC connection properties to address CVE-2021-26919

* fix tests for java 11
This commit is contained in:
Jihoon Son 2021-04-01 17:30:47 -07:00 committed by GitHub
parent d7f5293364
commit cfcebc40f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 2219 additions and 70 deletions

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.google.common.base.Preconditions;
import java.util.Set;
public final class ConnectionUriUtils
{
// Note: MySQL JDBC connector 8 supports 7 other protocols than just `jdbc:mysql:`
// (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-jdbc-url-format.html).
// We should consider either expanding recognized mysql protocols or restricting allowed protocols to
// just a basic one.
public static final String MYSQL_PREFIX = "jdbc:mysql:";
public static final String POSTGRES_PREFIX = "jdbc:postgresql:";
/**
* This method checks {@param actualProperties} against {@param allowedProperties} if they are not system properties.
* A property is regarded as a system property if its name starts with a prefix in {@param systemPropertyPrefixes}.
* See org.apache.druid.server.initialization.JDBCAccessSecurityConfig for more details.
*
* If a non-system property that is not allowed is found, this method throws an {@link IllegalArgumentException}.
*/
public static void throwIfPropertiesAreNotAllowed(
Set<String> actualProperties,
Set<String> systemPropertyPrefixes,
Set<String> allowedProperties
)
{
for (String property : actualProperties) {
if (systemPropertyPrefixes.stream().noneMatch(property::startsWith)) {
Preconditions.checkArgument(
allowedProperties.contains(property),
"The property [%s] is not in the allowed list %s",
property,
allowedProperties
);
}
}
}
private ConnectionUriUtils()
{
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
public final class Throwables
{
public static boolean isThrowable(Throwable t, Class<? extends Throwable> searchFor)
{
if (t.getClass().isAssignableFrom(searchFor)) {
return true;
} else {
if (t.getCause() != null) {
return isThrowable(t.getCause(), searchFor);
} else {
return false;
}
}
}
private Throwables()
{
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import com.google.common.collect.ImmutableSet;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
@RunWith(Enclosed.class)
public class ConnectionUriUtilsTest
{
public static class ThrowIfURLHasNotAllowedPropertiesTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testEmptyActualProperties()
{
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
ImmutableSet.of(),
ImmutableSet.of("valid_key1", "valid_key2"),
ImmutableSet.of("system_key1", "system_key2")
);
}
@Test
public void testThrowForNonAllowedProperties()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The property [invalid_key] is not in the allowed list [valid_key1, valid_key2]");
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
ImmutableSet.of("valid_key1", "invalid_key"),
ImmutableSet.of("system_key1", "system_key2"),
ImmutableSet.of("valid_key1", "valid_key2")
);
}
@Test
public void testAllowedProperties()
{
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
ImmutableSet.of("valid_key2"),
ImmutableSet.of("system_key1", "system_key2"),
ImmutableSet.of("valid_key1", "valid_key2")
);
}
@Test
public void testAllowSystemProperties()
{
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
ImmutableSet.of("system_key1", "valid_key2"),
ImmutableSet.of("system_key1", "system_key2"),
ImmutableSet.of("valid_key1", "valid_key2")
);
}
@Test
public void testMatchSystemProperties()
{
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
ImmutableSet.of("system_key1.1", "system_key1.5", "system_key11.11", "valid_key2"),
ImmutableSet.of("system_key1", "system_key2"),
ImmutableSet.of("valid_key1", "valid_key2")
);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import org.junit.Assert;
import org.junit.Test;
public class ThrowablesTest
{
@Test
public void testIsThrowableItself()
{
Assert.assertTrue(Throwables.isThrowable(new NoClassDefFoundError(), NoClassDefFoundError.class));
}
@Test
public void testIsThrowableNestedThrowable()
{
Assert.assertTrue(
Throwables.isThrowable(new RuntimeException(new NoClassDefFoundError()), NoClassDefFoundError.class)
);
}
@Test
public void testIsThrowableNonTarget()
{
Assert.assertFalse(
Throwables.isThrowable(new RuntimeException(new ClassNotFoundException()), NoClassDefFoundError.class)
);
}
}

View File

@ -537,6 +537,25 @@ the [HTTP input source](../ingestion/native-batch.md#http-input-source) and the
|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|["http", "https"]|
### Ingestion Security Configuration
#### JDBC Connections to External Databases
You can use the following properties to specify permissible JDBC options for:
- [SQL input source](../ingestion/native-batch.md#sql-input-source)
- [SQL firehose](../ingestion/native-batch.md#sqlfirehose),
- [globally cached JDBC lookups](../development/extensions-core/lookups-cached-global.md#jdbc-lookup)
- [JDBC Data Fetcher for per-lookup caching](../development/extensions-core/druid-lookups.md#data-fetcher-layer).
These properties do not apply to metadata storage connections.
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.access.jdbc.enforceAllowedProperties`|Boolean|When true, Druid applies `druid.access.jdbc.allowedProperties` to JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:`. When false, Druid allows any kind of JDBC connections without JDBC property validation. This config is deprecated and will be removed in a future release.|false|
|`druid.access.jdbc.allowedProperties`|List of JDBC properties|Defines a list of allowed JDBC properties. Druid always enforces the list for all JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:` if `druid.access.jdbc.enforceAllowedProperties` is set to true.<br/><br/>This option is tested against MySQL connector 5.1.48 and PostgreSQL connector 42.2.14. Other connector versions might not work.|["useSSL", "requireSSL", "ssl", "sslmode"]|
|`druid.access.jdbc.allowUnknownJdbcUrlFormat`|Boolean|When false, Druid only accepts JDBC connections starting with `jdbc:postgresql:` or `jdbc:mysql:`. When true, Druid allows JDBC connections to any kind of database, but only enforces `druid.access.jdbc.allowedProperties` for PostgreSQL and MySQL.|true|
### Task Logging
If you are running the indexing service in remote mode, the task logs must be stored in S3, Azure Blob Store, Google Cloud Storage or HDFS.

View File

@ -72,7 +72,7 @@ Same for Loading cache, developer can implement a new type of loading cache by i
|Field|Type|Description|Required|default|
|-----|----|-----------|--------|-------|
|dataFetcher|JSON object|Specifies the lookup data fetcher type to use in order to fetch data|yes|null|
|dataFetcher|JSON object|Specifies the lookup data fetcher type for fetching data|yes|null|
|cacheFactory|JSON Object|Cache factory implementation|no |onHeapPolling|
|pollPeriod|Period|polling period |no |null (poll once)|
@ -129,7 +129,7 @@ Guava cache configuration spec.
"type":"loadingLookup",
"dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
"loadingCacheSpec":{"type":"guava"},
"reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterAccess":10000}
"reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterWrite":10000}
}
```
@ -150,6 +150,16 @@ Off heap cache is backed by [MapDB](http://www.mapdb.org/) implementation. MapDB
"type":"loadingLookup",
"dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
"loadingCacheSpec":{"type":"mapDb", "maxEntriesSize":100000},
"reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterAccess":10000}
"reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterWrite":10000}
}
```
### JDBC Data Fetcher
|Field|Type|Description|Required|default|
|-----|----|-----------|--------|-------|
|`connectorConfig`|JSON object|Specifies the database connection details. You can set `connectURI`, `user` and `password`. You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../../configuration/index.md#jdbc-connections-to-external-databases) for more details.|yes||
|`table`|string|The table name to read from.|yes||
|`keyColumn`|string|The column name that contains the lookup key.|yes||
|`valueColumn`|string|The column name that contains the lookup value.|yes||
|`streamingFetchSize`|int|Fetch size used in JDBC connections.|no|1000|

View File

@ -64,7 +64,6 @@ Globally cached lookups can be specified as part of the [cluster wide config for
"extractionNamespace": {
"type": "jdbc",
"connectorConfig": {
"createTables": true,
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd"
@ -107,7 +106,6 @@ In a simple case where only one [tier](../../querying/lookups.md#dynamic-configu
"extractionNamespace": {
"type": "jdbc",
"connectorConfig": {
"createTables": true,
"connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
"user": "druid",
"password": "diurd"
@ -136,7 +134,6 @@ Where the Coordinator endpoint `/druid/coordinator/v1/lookups/realtime_customer2
"extractionNamespace": {
"type": "jdbc",
"connectorConfig": {
"createTables": true,
"connectURI": "jdbc:mysql://localhost:3306/druid",
"user": "druid",
"password": "diurd"
@ -347,7 +344,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`connectorConfig`|The connector config to use|Yes||
|`connectorConfig`|The connector config to use. You can set `connectURI`, `user` and `password`. You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../../configuration/index.md#jdbc-connections-to-external-databases) for more details.|Yes||
|`table`|The table which contains the key value pairs|Yes||
|`keyColumn`|The column in `table` which contains the keys|Yes||
|`valueColumn`|The column in `table` which contains the values|Yes||
@ -359,7 +356,6 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
{
"type":"jdbc",
"connectorConfig":{
"createTables":true,
"connectURI":"jdbc:mysql://localhost:3306/druid",
"user":"druid",
"password":"diurd"

View File

@ -105,7 +105,6 @@ Copy or symlink this file to `extensions/mysql-metadata-storage` under the distr
|`druid.metadata.mysql.ssl.enabledSSLCipherSuites`|Overrides the existing cipher suites with these cipher suites.|none|no|
|`druid.metadata.mysql.ssl.enabledTLSProtocols`|Overrides the TLS protocols with these protocols.|none|no|
### MySQL Firehose
The MySQL extension provides an implementation of an [SqlFirehose](../../ingestion/native-batch.md#firehoses-deprecated) which can be used to ingest data into Druid from a MySQL database.

View File

@ -1409,7 +1409,7 @@ Please refer to the Recommended practices section below before using this input
|property|description|required?|
|--------|-----------|---------|
|type|This should be "sql".|Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support and this extension must be loaded into Druid. For database types `mysql` and `postgresql`, the `connectorConfig` support is provided by [mysql-metadata-storage](../development/extensions-core/mysql.md) and [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extensions respectively.|Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../configuration/index.md#jdbc-connections-to-external-databases) for more details.|Yes|
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.|Yes|
@ -1763,7 +1763,7 @@ Requires one of the following extensions:
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "sql".||Yes|
|database|Specifies the database connection details.||Yes|
|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:<br/><br/><ul><li>[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`</li><li> [postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.</li></ul><br/><br/>You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../configuration/index.md#jdbc-connections-to-external-databases) for more details.||Yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|

View File

@ -110,12 +110,15 @@
<artifactId>jsr311-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Included to improve the out-of-the-box experience for supported JDBC connectors -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Tests -->

View File

@ -19,17 +19,28 @@
package org.apache.druid.query.lookup.namespace;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.ConnectionUriUtils;
import org.apache.druid.utils.Throwables;
import org.joda.time.Period;
import org.postgresql.Driver;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.sql.SQLException;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
/**
*
@ -61,11 +72,15 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@NotNull @JsonProperty(value = "valueColumn", required = true) final String valueColumn,
@JsonProperty(value = "tsColumn", required = false) @Nullable final String tsColumn,
@JsonProperty(value = "filter", required = false) @Nullable final String filter,
@Min(0) @JsonProperty(value = "pollPeriod", required = false) @Nullable final Period pollPeriod
@Min(0) @JsonProperty(value = "pollPeriod", required = false) @Nullable final Period pollPeriod,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
// Check the properties in the connection URL. Note that JdbcExtractionNamespace doesn't use
// MetadataStorageConnectorConfig.getDbcpProperties(). If we want to use them,
// those DBCP properties should be validated using the same logic.
checkConnectionURL(connectorConfig.getConnectURI(), securityConfig);
this.table = Preconditions.checkNotNull(table, "table");
this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
@ -74,6 +89,89 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
}
/**
* Check the given URL whether it contains non-allowed properties.
*
* This method should be in sync with the following methods:
*
* - {@code org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()}
* - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
* - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
*
* @see JdbcAccessSecurityConfig#getAllowedProperties()
*/
private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
{
Preconditions.checkNotNull(url, "connectorConfig.connectURI");
if (!securityConfig.isEnforceAllowedProperties()) {
// You don't want to do anything with properties.
return;
}
@Nullable final Properties properties; // null when url has an invalid format
if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(url, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
try {
properties = Driver.parseURL(url, null);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("org/postgresql/Driver")) {
throw new RuntimeException(
"Failed to find PostgreSQL driver class. "
+ "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else {
if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
properties = new Properties();
} else {
// unknown format but it is not allowed
throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
}
}
if (properties == null) {
// There is something wrong with the URL format.
throw new IAE("Invalid URL format [%s]", url);
}
final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> propertyKeys.add((String) k));
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeys,
securityConfig.getSystemPropertyPrefixes(),
securityConfig.getAllowedProperties()
);
}
public MetadataStorageConnectorConfig getConnectorConfig()
{
return connectorConfig;

View File

@ -0,0 +1,431 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup.namespace;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.joda.time.Period;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import java.util.Set;
@RunWith(Enclosed.class)
public class JdbcExtractionNamespaceUrlCheckTest
{
private static final String TABLE_NAME = "abstractDbRenameTest";
private static final String KEY_NAME = "keyName";
private static final String VAL_NAME = "valName";
private static final String TS_COLUMN = "tsColumn";
public static class MySqlTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
{
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testThrowWhenUrlHasNonAllowedPropertiesWhenEnforcingAllowedProperties()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testWhenUrlHasNonAllowedPropertiesWhenNotEnforcingAllowedProperties()
{
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return false;
}
}
);
}
@Test
public void testWhenInvalidUrlFormat()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql:/invalid-url::3006";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
public static class PostgreSqlTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
{
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testThrowWhenUrlHasNonAllowedPropertiesWhenEnforcingAllowedProperties()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testWhenUrlHasNonAllowedPropertiesWhenNotEnforcingAllowedProperties()
{
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return false;
}
}
);
}
@Test
public void testWhenInvalidUrlFormat()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://invalid-url::3006";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
public static class UnknownSchemeTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testThrowWhenUnknownFormatIsNotAllowed()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Unknown JDBC connection scheme: mydb");
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isAllowUnknownJdbcUrlFormat()
{
return false;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testSkipUrlParsingWhenUnknownFormatIsAllowed()
{
new JdbcExtractionNamespace(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_NAME,
VAL_NAME,
TS_COLUMN,
"some filter",
new Period(10),
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isAllowUnknownJdbcUrlFormat()
{
return true;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.namespace.cache.CacheScheduler;
import org.apache.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -42,7 +43,7 @@ import java.util.Collections;
public class JdbcCacheGeneratorTest
{
private static final MetadataStorageConnectorConfig MISSING_METADATA_STORAGE_CONNECTOR_CONFIG =
createMetadataStorageConnectorConfig("postgresql");
createMetadataStorageConnectorConfig("mydb");
private static final CacheScheduler.EntryImpl<JdbcExtractionNamespace> KEY =
EasyMock.mock(CacheScheduler.EntryImpl.class);
@ -127,7 +128,8 @@ public class JdbcCacheGeneratorTest
"valueColumn",
tsColumn,
"filter",
Period.ZERO
Period.ZERO,
new JdbcAccessSecurityConfig()
);
}
}

View File

@ -19,12 +19,15 @@
package org.apache.druid.server.lookup.namespace.cache;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
@ -34,7 +37,7 @@ import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.ExtractionNamespace;
import org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.namespace.JdbcCacheGenerator;
import org.apache.druid.server.lookup.namespace.NamespaceExtractionConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -73,6 +76,7 @@ public class JdbcExtractionNamespaceTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private static final Logger log = new Logger(JdbcExtractionNamespaceTest.class);
private static final String TABLE_NAME = "abstractDbRenameTest";
private static final String KEY_NAME = "keyName";
@ -376,7 +380,8 @@ public class JdbcExtractionNamespaceTest
VAL_NAME,
tsColumn,
null,
new Period(0)
new Period(0),
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
CacheSchedulerTest.waitFor(entry);
@ -407,7 +412,8 @@ public class JdbcExtractionNamespaceTest
VAL_NAME,
tsColumn,
FILTER_COLUMN + "='1'",
new Period(0)
new Period(0),
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
CacheSchedulerTest.waitFor(entry);
@ -472,6 +478,7 @@ public class JdbcExtractionNamespaceTest
@Test
public void testSerde() throws IOException
{
final JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig();
final JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
derbyConnectorRule.getMetadataConnectorConfig(),
TABLE_NAME,
@ -479,11 +486,14 @@ public class JdbcExtractionNamespaceTest
VAL_NAME,
tsColumn,
"some filter",
new Period(10)
new Period(10),
securityConfig
);
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(new Std().addValue(JdbcAccessSecurityConfig.class, securityConfig));
final ExtractionNamespace extractionNamespace2 = ServerTestHelper.MAPPER.readValue(
ServerTestHelper.MAPPER.writeValueAsBytes(extractionNamespace),
final ExtractionNamespace extractionNamespace2 = mapper.readValue(
mapper.writeValueAsBytes(extractionNamespace),
ExtractionNamespace.class
);
@ -500,7 +510,8 @@ public class JdbcExtractionNamespaceTest
VAL_NAME,
tsColumn,
null,
new Period(10)
new Period(10),
new JdbcAccessSecurityConfig()
);
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);

View File

@ -96,12 +96,15 @@
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<!-- Included to improve the out-of-the-box experience for supported JDBC connectors -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Tests -->

View File

@ -19,15 +19,23 @@
package org.apache.druid.server.lookup.jdbc;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.DataFetcher;
import org.apache.druid.utils.ConnectionUriUtils;
import org.apache.druid.utils.Throwables;
import org.postgresql.Driver;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
@ -39,6 +47,8 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
public class JdbcDataFetcher implements DataFetcher<String, String>
@ -71,12 +81,16 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
@JsonProperty("table") String table,
@JsonProperty("keyColumn") String keyColumn,
@JsonProperty("valueColumn") String valueColumn,
@JsonProperty("streamingFetchSize") @Nullable Integer streamingFetchSize
@JsonProperty("streamingFetchSize") @Nullable Integer streamingFetchSize,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
this.streamingFetchSize = streamingFetchSize == null ? DEFAULT_STREAMING_FETCH_SIZE : streamingFetchSize;
Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
// Check the properties in the connection URL. Note that JdbcDataFetcher doesn't use
// MetadataStorageConnectorConfig.getDbcpProperties(). If we want to use them,
// those DBCP properties should be validated using the same logic.
checkConnectionURL(connectorConfig.getConnectURI(), securityConfig);
this.table = Preconditions.checkNotNull(table, "table");
this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
@ -107,6 +121,89 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
dbi.registerMapper(new KeyValueResultSetMapper(keyColumn, valueColumn));
}
/**
* Check the given URL whether it contains non-allowed properties.
*
* This method should be in sync with the following methods:
*
* - {@code org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()}
* - {@code org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
* - {@code org.apache.druid.firehose.sql.PostgresqlFirehoseDatabaseConnector.findPropertyKeysFromConnectURL()}
*
* @see JdbcAccessSecurityConfig#getAllowedProperties()
*/
private static void checkConnectionURL(String url, JdbcAccessSecurityConfig securityConfig)
{
Preconditions.checkNotNull(url, "connectorConfig.connectURI");
if (!securityConfig.isEnforceAllowedProperties()) {
// You don't want to do anything with properties.
return;
}
@Nullable final Properties properties;
if (url.startsWith(ConnectionUriUtils.MYSQL_PREFIX)) {
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(url, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else if (url.startsWith(ConnectionUriUtils.POSTGRES_PREFIX)) {
try {
properties = Driver.parseURL(url, null);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("org/postgresql/Driver")) {
throw new RuntimeException(
"Failed to find PostgreSQL driver class. "
+ "Please check the PostgreSQL connector version 42.2.14 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
} else {
if (securityConfig.isAllowUnknownJdbcUrlFormat()) {
properties = new Properties();
} else {
// unknown format but it is not allowed
throw new IAE("Unknown JDBC connection scheme: %s", url.split(":")[1]);
}
}
if (properties == null) {
// There is something wrong with the URL format.
throw new IAE("Invalid URL format [%s]", url);
}
final Set<String> propertyKeys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> propertyKeys.add((String) k));
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeys,
securityConfig.getSystemPropertyPrefixes(),
securityConfig.getAllowedProperties()
);
}
@Override
public Iterable<Map.Entry<String, String>> fetchAll()
{
@ -232,7 +329,7 @@ public class JdbcDataFetcher implements DataFetcher<String, String>
if (e.getMessage().contains("No suitable driver found")) {
throw new ISE(
e,
"JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory"
"JDBC driver JAR files missing in the classpath"
);
} else {
throw e;

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.lookup.jdbc;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@ -26,6 +27,7 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.server.lookup.DataFetcher;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@ -74,7 +76,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
"tableName",
"keyColumn",
"valueColumn",
100
100,
new JdbcAccessSecurityConfig()
);
handle = derbyConnectorRule.getConnector().getDBI().open();
@ -155,14 +158,17 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
@Test
public void testSerDesr() throws IOException
{
final JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig();
JdbcDataFetcher jdbcDataFetcher = new JdbcDataFetcher(
new MetadataStorageConnectorConfig(),
"table",
"keyColumn",
"ValueColumn",
100
100,
securityConfig
);
DefaultObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(new Std().addValue(JdbcAccessSecurityConfig.class, securityConfig));
String jdbcDataFetcherSer = mapper.writeValueAsString(jdbcDataFetcher);
Assert.assertEquals(jdbcDataFetcher, mapper.readerFor(DataFetcher.class).readValue(jdbcDataFetcherSer));
}
@ -213,7 +219,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
null
null,
new JdbcAccessSecurityConfig()
);
}
@ -244,7 +251,7 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
private void test(Runnable runnable)
{
exception.expect(IllegalStateException.class);
exception.expectMessage("JDBC driver JAR files missing from extensions/druid-lookups-cached-single directory");
exception.expectMessage("JDBC driver JAR files missing in the classpath");
runnable.run();
}
@ -252,8 +259,8 @@ public class JdbcDataFetcherTest extends InitializedNullHandlingTest
@SuppressWarnings("SameParameterValue")
private static MetadataStorageConnectorConfig createMissingMetadataStorageConnectorConfig()
{
String type = "postgresql";
String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:5432\"}";
String type = "mydb";
String json = "{\"connectURI\":\"jdbc:" + type + "://localhost:3306/\"}";
try {
return new ObjectMapper().readValue(json, MetadataStorageConnectorConfig.class);
}

View File

@ -0,0 +1,409 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.lookup.jdbc;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import java.util.Set;
@RunWith(Enclosed.class)
public class JdbcDataFetcherUrlCheckTest
{
private static final String TABLE_NAME = "tableName";
private static final String KEY_COLUMN = "keyColumn";
private static final String VALUE_COLUMN = "valueColumn";
public static class MySqlTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
{
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testThrowWhenUrlHasDisallowedPropertiesWhenEnforcingAllowedProperties()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testWhenUrlHasDisallowedPropertiesWhenNotEnforcingAllowedProperties()
{
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return false;
}
}
);
}
@Test
public void testWhenInvalidUrlFormat()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:mysql:/invalid-url::3006]");
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql:/invalid-url::3006";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
public static class PostgreSqlTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testCreateInstanceWhenUrlHasOnlyAllowedProperties()
{
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testThrowWhenUrlHasDisallowedPropertiesWhenEnforcingAllowedProperties()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("The property [invalid_key1] is not in the allowed list [valid_key1, valid_key2]");
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testWhenUrlHasDisallowedPropertiesWhenNotEnforcingAllowedProperties()
{
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:5432/db?invalid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return false;
}
}
);
}
@Test
public void testWhenInvalidUrlFormat()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Invalid URL format [jdbc:postgresql://invalid-url::3006]");
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://invalid-url::3006";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
public static class UnknownSchemeTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testThrowWhenUnknownFormatIsNotAllowed()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Unknown JDBC connection scheme: mydb");
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isAllowUnknownJdbcUrlFormat()
{
return false;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
@Test
public void testSkipUrlParsingWhenUnknownFormatIsAllowed()
{
new JdbcDataFetcher(
new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mydb://localhost:5432/db?valid_key1=val1&valid_key2=val2";
}
},
TABLE_NAME,
KEY_COLUMN,
VALUE_COLUMN,
100,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("valid_key1", "valid_key2");
}
@Override
public boolean isAllowUnknownJdbcUrlFormat()
{
return true;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
}
);
}
}
}

View File

@ -88,6 +88,11 @@
<artifactId>commons-dbcp2</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -19,13 +19,24 @@
package org.apache.druid.firehose.sql;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import com.mysql.jdbc.NonRegisteringDriver;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.Throwables;
import org.skife.jdbi.v2.DBI;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;
@JsonTypeName("mysql")
public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
@ -33,12 +44,14 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
@JsonCreator
public MySQLFirehoseDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
this.connectorConfig = connectorConfig;
final BasicDataSource datasource = getDatasource(connectorConfig);
final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("com.mysql.jdbc.Driver");
this.dbi = new DBI(datasource);
@ -55,4 +68,39 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
{
return dbi;
}
@Override
public Set<String> findPropertyKeysFromConnectURL(String connectUrl)
{
// This method should be in sync with
// - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
// - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
Properties properties;
try {
NonRegisteringDriver driver = new NonRegisteringDriver();
properties = driver.parseURL(connectUrl, null);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
catch (Throwable e) {
if (Throwables.isThrowable(e, NoClassDefFoundError.class)
|| Throwables.isThrowable(e, ClassNotFoundException.class)) {
if (e.getMessage().contains("com/mysql/jdbc/NonRegisteringDriver")) {
throw new RuntimeException(
"Failed to find MySQL driver class. Please check the MySQL connector version 5.1.48 is in the classpath",
e
);
}
}
throw new RuntimeException(e);
}
if (properties == null) {
throw new IAE("Invalid URL format for MySQL: [%s]", connectUrl);
}
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> keys.add((String) k));
return keys;
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.firehose.sql;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Set;
public class MySQLFirehoseDatabaseConnectorTest
{
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testSuccessWhenAllowlistAndNoProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailWhenNoAllowlistAndHaveProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of(""));
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testSuccessOnlyValidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
ImmutableSet.of("user", "password", "keyonly", "etc")
);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailOnlyInvalidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("none", "nonenone"));
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailValidAndInvalidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testIgnoreInvalidPropertyWhenNotEnforcingAllowList()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:mysql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("user", "nonenone");
}
};
new MySQLFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFindPropertyKeysFromInvalidConnectUrl()
{
final String url = "jdbc:mysql:/invalid-url::3006";
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return url;
}
};
MySQLFirehoseDatabaseConnector connector = new MySQLFirehoseDatabaseConnector(
connectorConfig,
new JdbcAccessSecurityConfig()
);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(StringUtils.format("Invalid URL format for MySQL: [%s]", url));
connector.findPropertyKeysFromConnectURL(url);
}
private static JdbcAccessSecurityConfig newSecurityConfigEnforcingAllowList(Set<String> allowedProperties)
{
return new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return allowedProperties;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
};
}
}

View File

@ -19,13 +19,22 @@
package org.apache.druid.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Sets;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.postgresql.Driver;
import org.skife.jdbi.v2.DBI;
import java.util.Properties;
import java.util.Set;
@JsonTypeName("postgresql")
public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
@ -33,12 +42,14 @@ public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConn
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
@JsonCreator
public PostgresqlFirehoseDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
this.connectorConfig = connectorConfig;
final BasicDataSource datasource = getDatasource(connectorConfig);
final BasicDataSource datasource = getDatasource(connectorConfig, securityConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.postgresql.Driver");
this.dbi = new DBI(datasource);
@ -55,4 +66,21 @@ public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConn
{
return dbi;
}
@Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri)
{
// This method should be in sync with
// - org.apache.druid.server.lookup.jdbc.JdbcDataFetcher.checkConnectionURL()
// - org.apache.druid.query.lookup.namespace.JdbcExtractionNamespace.checkConnectionURL()
// Postgresql JDBC driver is embedded and thus must be loaded.
Properties properties = Driver.parseURL(connectUri, null);
if (properties == null) {
throw new IAE("Invalid URL format for PostgreSQL: [%s]", connectUri);
}
Set<String> keys = Sets.newHashSetWithExpectedSize(properties.size());
properties.forEach((k, v) -> keys.add((String) k));
return keys;
}
}

View File

@ -57,7 +57,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
private volatile Boolean canUpsert;
private final String dbTableSchema;
@Inject
public PostgreSQLConnector(
Supplier<MetadataStorageConnectorConfig> config,

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.firehose;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Set;
public class PostgresqlFirehoseDatabaseConnectorTest
{
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Test
public void testSuccessWhenNoPropertyInUriAndNoAllowlist()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testSuccessWhenAllowlistAndNoProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailWhenNoAllowlistAndHaveProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of(""));
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testSuccessOnlyValidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(
ImmutableSet.of("user", "password", "keyonly", "etc")
);
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailOnlyInvalidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("none", "nonenone"));
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testFailValidAndInvalidProperty()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user", "nonenone"));
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
@Test
public void testIgnoreInvalidPropertyWhenNotEnforcingAllowList()
{
MetadataStorageConnectorConfig connectorConfig = new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()
{
return "jdbc:postgresql://localhost:3306/test?user=maytas&password=secret&keyonly";
}
};
JdbcAccessSecurityConfig securityConfig = new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("user", "nonenone");
}
};
new PostgresqlFirehoseDatabaseConnector(
connectorConfig,
securityConfig
);
}
private static JdbcAccessSecurityConfig newSecurityConfigEnforcingAllowList(Set<String> allowedProperties)
{
return new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return allowedProperties;
}
@Override
public boolean isEnforceAllowedProperties()
{
return true;
}
};
}
}

View File

@ -63,6 +63,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumModule;
import org.apache.druid.server.emitter.EmitterModule;
import org.apache.druid.server.initialization.AuthenticatorMapperModule;
import org.apache.druid.server.initialization.AuthorizerMapperModule;
import org.apache.druid.server.initialization.ExternalStorageAccessSecurityModule;
import org.apache.druid.server.initialization.jetty.JettyServerModule;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.server.security.TLSCertificateCheckerModule;
@ -411,7 +412,8 @@ public class Initialization
new EscalatorModule(),
new AuthorizerModule(),
new AuthorizerMapperModule(),
new StartupLoggingModule()
new StartupLoggingModule(),
new ExternalStorageAccessSecurityModule()
);
ModuleList actualModules = new ModuleList(baseInjector);

View File

@ -42,9 +42,18 @@ public class BasicDataSourceExt extends BasicDataSource
{
private static final Logger LOGGER = new Logger(BasicDataSourceExt.class);
private Properties connectionProperties;
private final MetadataStorageConnectorConfig connectorConfig;
/**
* The properties that will be used for the JDBC connection.
*
* Note that these properties are not currently checked against any security configuration such as
* an allow list for JDBC properties. Instead, they are supposed to be checked before adding to this class.
*
* @see SQLFirehoseDatabaseConnector#validateConfigs
*/
private Properties connectionProperties;
public BasicDataSourceExt(MetadataStorageConnectorConfig connectorConfig)
{
this.connectorConfig = connectorConfig;

View File

@ -21,8 +21,11 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.ConnectionUriUtils;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
@ -32,6 +35,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public abstract class SQLFirehoseDatabaseConnector
@ -62,8 +66,15 @@ public abstract class SQLFirehoseDatabaseConnector
|| (e instanceof DBIException && isTransientException(e.getCause())));
}
protected BasicDataSource getDatasource(MetadataStorageConnectorConfig connectorConfig)
protected BasicDataSource getDatasource(
MetadataStorageConnectorConfig connectorConfig,
JdbcAccessSecurityConfig securityConfig
)
{
// We validate only the connection URL here as all properties will be read from only the URL except
// users and password. If we want to allow another way to specify user properties such as using
// MetadataStorageConnectorConfig.getDbcpProperties(), those properties should be validated as well.
validateConfigs(connectorConfig.getConnectURI(), securityConfig);
BasicDataSource dataSource = new BasicDataSourceExt(connectorConfig);
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
@ -75,6 +86,23 @@ public abstract class SQLFirehoseDatabaseConnector
return dataSource;
}
private void validateConfigs(String urlString, JdbcAccessSecurityConfig securityConfig)
{
if (Strings.isNullOrEmpty(urlString)) {
throw new IllegalArgumentException("connectURI cannot be null or empty");
}
if (!securityConfig.isEnforceAllowedProperties()) {
// You don't want to do anything with properties.
return;
}
final Set<String> propertyKeyFromConnectURL = findPropertyKeysFromConnectURL(urlString);
ConnectionUriUtils.throwIfPropertiesAreNotAllowed(
propertyKeyFromConnectURL,
securityConfig.getSystemPropertyPrefixes(),
securityConfig.getAllowedProperties()
);
}
public String getValidationQuery()
{
return "SELECT 1";
@ -82,5 +110,8 @@ public abstract class SQLFirehoseDatabaseConnector
public abstract DBI getDBI();
/**
* Extract property keys from the given JDBC URL.
*/
public abstract Set<String> findPropertyKeysFromConnectURL(String connectUri);
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.initialization;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.initialization.DruidModule;
import java.util.List;
public class ExternalStorageAccessSecurityModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.access.jdbc", JdbcAccessSecurityConfig.class);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.initialization;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
/**
* A config class that applies to all JDBC connections to other databases.
*
* @see org.apache.druid.utils.ConnectionUriUtils
*/
public class JdbcAccessSecurityConfig
{
static final Set<String> DEFAULT_ALLOWED_PROPERTIES = ImmutableSet.of(
// MySQL
"useSSL",
"requireSSL",
// PostgreSQL
"ssl",
"sslmode"
);
/**
* Prefixes of the properties that can be added automatically by {@link java.sql.Driver} during
* connection URL parsing. Any properties resulted by connection URL parsing are regarded as
* system properties if they start with the prefixes in this set.
* Only these non-system properties are checkaed against {@link #getAllowedProperties()}.
*/
private static final Set<String> SYSTEM_PROPERTY_PREFIXES = ImmutableSet.of(
// MySQL
// There can be multiple host and port properties if multiple addresses are specified.
// The pattern of the property name is HOST.i and PORT.i where i is an integer.
"HOST",
"PORT",
"NUM_HOSTS",
"DBNAME",
// PostgreSQL
"PGHOST",
"PGPORT",
"PGDBNAME"
);
@JsonProperty
private Set<String> allowedProperties = DEFAULT_ALLOWED_PROPERTIES;
@JsonProperty
private boolean allowUnknownJdbcUrlFormat = true;
// Enforcing allow list check can break rolling upgrade. This is not good for patch releases
// and is why this config is added. However, from the security point of view, this config
// should be always enabled in production to secure your cluster. As a result, this config
// is deprecated and will be removed in the near future.
@Deprecated
@JsonProperty
private boolean enforceAllowedProperties = false;
@JsonIgnore
public Set<String> getSystemPropertyPrefixes()
{
return SYSTEM_PROPERTY_PREFIXES;
}
public Set<String> getAllowedProperties()
{
return allowedProperties;
}
public boolean isAllowUnknownJdbcUrlFormat()
{
return allowUnknownJdbcUrlFormat;
}
public boolean isEnforceAllowedProperties()
{
return enforceAllowedProperties;
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
@ -43,6 +44,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
@ -58,6 +60,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -246,7 +249,17 @@ public class SqlInputSourceTest
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig
)
{
final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
final BasicDataSource datasource = getDatasource(
metadataStorageConnectorConfig,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("user", "create");
}
}
);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
this.dbi = new DBI(datasource);
@ -283,5 +296,11 @@ public class SqlInputSourceTest
{
return dbi;
}
@Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri)
{
return ImmutableSet.of("user", "create");
}
}
}

View File

@ -21,16 +21,20 @@ package org.apache.druid.metadata.input;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.junit.Rule;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Set;
public class SqlTestUtils
{
@Rule
@ -55,7 +59,17 @@ public class SqlTestUtils
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi
)
{
final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
final BasicDataSource datasource = getDatasource(
metadataStorageConnectorConfig,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
{
return ImmutableSet.of("user", "create");
}
}
);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
this.dbi = dbi;
@ -66,6 +80,12 @@ public class SqlTestUtils
{
return dbi;
}
@Override
public Set<String> findPropertyKeysFromConnectURL(String connectUri)
{
return ImmutableSet.of("user", "create");
}
}
public void createAndUpdateTable(final String tableName, int numEntries)

View File

@ -21,7 +21,6 @@ package org.apache.druid.segment.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.Row;
@ -31,8 +30,6 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.input.SqlTestUtils;
import org.apache.druid.segment.TestHelper;
@ -42,7 +39,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.skife.jdbi.v2.DBI;
import java.io.File;
import java.io.IOException;
@ -236,22 +232,4 @@ public class SqlFirehoseFactoryTest
testUtils.dropTable(TABLE_NAME_2);
}
private static class TestDerbyFirehoseConnector extends SQLFirehoseDatabaseConnector
{
private final DBI dbi;
private TestDerbyFirehoseConnector(MetadataStorageConnectorConfig metadataStorageConnectorConfig, DBI dbi)
{
final BasicDataSource datasource = getDatasource(metadataStorageConnectorConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
this.dbi = dbi;
}
@Override
public DBI getDBI()
{
return dbi;
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.initialization;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.junit.Assert;
import org.junit.Test;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
public class ExternalStorageAccessSecurityModuleTest
{
@Test
public void testSecurityConfigDefault()
{
JdbcAccessSecurityConfig securityConfig = makeInjectorWithProperties(new Properties()).getInstance(
JdbcAccessSecurityConfig.class
);
Assert.assertNotNull(securityConfig);
Assert.assertEquals(
JdbcAccessSecurityConfig.DEFAULT_ALLOWED_PROPERTIES,
securityConfig.getAllowedProperties()
);
Assert.assertTrue(securityConfig.isAllowUnknownJdbcUrlFormat());
Assert.assertFalse(securityConfig.isEnforceAllowedProperties());
}
@Test
public void testSecurityConfigOverride()
{
Properties properties = new Properties();
properties.setProperty("druid.access.jdbc.allowedProperties", "[\"valid1\", \"valid2\", \"valid3\"]");
properties.setProperty("druid.access.jdbc.allowUnknownJdbcUrlFormat", "false");
properties.setProperty("druid.access.jdbc.enforceAllowedProperties", "true");
JdbcAccessSecurityConfig securityConfig = makeInjectorWithProperties(properties).getInstance(
JdbcAccessSecurityConfig.class
);
Assert.assertNotNull(securityConfig);
Assert.assertEquals(
ImmutableSet.of(
"valid1",
"valid2",
"valid3"
),
securityConfig.getAllowedProperties()
);
Assert.assertFalse(securityConfig.isAllowUnknownJdbcUrlFormat());
Assert.assertTrue(securityConfig.isEnforceAllowedProperties());
}
private static Injector makeInjectorWithProperties(final Properties props)
{
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
},
new ExternalStorageAccessSecurityModule()
)
);
}
}

View File

@ -370,6 +370,7 @@ reingest
reingesting
reingestion
repo
requireSSL
rollup
rollups
rsync
@ -385,6 +386,8 @@ sharding
skipHeaderRows
smooshed
splittable
ssl
sslmode
stdout
storages
stringified
@ -420,6 +423,7 @@ unparsed
unsetting
untrusted
useFilterCNF
useSSL
uptime
uris
urls