Add Postgresql SqlFirehose (#6813)

* Add Postgresql SqlFirehose

* Fix Code Style.

* Fix style.

* Fix Import Order.

* Add Line Break before package.
This commit is contained in:
scrawfor 2019-02-15 01:52:03 -05:00 committed by Clint Wylie
parent ee91e27fe7
commit 0fa9000849
4 changed files with 96 additions and 16 deletions

View File

@ -110,7 +110,10 @@ A sample ingest firehose spec is shown below -
#### SqlFirehose
SqlFirehoseFactory can be used to ingest events residing in RDBMS. The database connection information is provided as part of the ingestion spec. For each query, the results are fetched locally and indexed. If there are multiple queries from which data needs to be indexed, queries are prefetched in the background upto `maxFetchCapacityBytes` bytes.
An example is shown below:
Requires one of the following extensions:
* [MySQL Metadata Store](../ingestion/mysql.html).
* [PostgreSQL Metadata Store](../ingestion/postgresql.html).
```json
{
@ -118,20 +121,19 @@ An example is shown below:
"database": {
"type": "mysql",
"connectorConfig" : {
"connectURI" : "jdbc:mysql://host:port/schema",
"user" : "user",
"password" : "password"
"connectURI" : "jdbc:mysql://host:port/schema",
"user" : "user",
"password" : "password"
}
},
"sqls" : ["SELECT * FROM table1", "SELECT * FROM table2"]
}
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be "sql".||Yes|
|database|Specifies the database connection details.`type` should specify the database type and `connectorConfig` should specify the database connection properties via `connectURI`, `user` and `password`||Yes|
|database|Specifies the database connection details.||Yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
@ -139,6 +141,14 @@ An example is shown below:
|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
#### Database
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
|connectorConfig|specify the database connection properties via `connectURI`, `user` and `password`||Yes|
### CombiningFirehose
This firehose can be used to combine and merge data from a list of different firehoses.

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
import org.skife.jdbi.v2.DBI;
@JsonTypeName("postgresql")
public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
{
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
public PostgresqlFirehoseDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig
)
{
this.connectorConfig = connectorConfig;
final BasicDataSource datasource = getDatasource(connectorConfig);
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.postgresql.Driver");
this.dbi = new DBI(datasource);
}
@JsonProperty
public MetadataStorageConnectorConfig getConnectorConfig()
{
return connectorConfig;
}
@Override
public DBI getDBI()
{
return dbi;
}
}

View File

@ -148,10 +148,10 @@ public class PostgreSQLConnector extends SQLMetadataConnector
return !handle.createQuery(
"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename ILIKE :tableName"
)
.bind("tableName", tableName)
.map(StringMapper.FIRST)
.list()
.isEmpty();
.bind("tableName", tableName)
.map(StringMapper.FIRST)
.list()
.isEmpty();
}
@Override
@ -184,10 +184,14 @@ public class PostgreSQLConnector extends SQLMetadataConnector
} else {
handle.createStatement(
StringUtils.format(
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"BEGIN;\n"
+
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n"
+
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n"
+
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;"
+
"COMMIT;",
tableName,
keyColumn,

View File

@ -20,9 +20,11 @@
package org.apache.druid.metadata.storage.postgresql;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import org.apache.druid.firehose.PostgresqlFirehoseDatabaseConnector;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
@ -35,6 +37,7 @@ import org.apache.druid.metadata.NoopMetadataStorageProvider;
import org.apache.druid.metadata.PostgreSQLMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.SQLMetadataConnector;
import java.util.Collections;
import java.util.List;
public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
@ -50,7 +53,12 @@ public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModu
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
return Collections.singletonList(
new SimpleModule()
.registerSubtypes(
new NamedType(PostgresqlFirehoseDatabaseConnector.class, "postgresql")
)
);
}
@Override