Extension to read and ingest iceberg data files (#14329)

This adds a new contrib extension: druid-iceberg-extensions which can be used to ingest data stored in Apache Iceberg format. It adds a new input source of type iceberg that connects to a catalog and retrieves the data files associated with an iceberg table and provides these data file paths to either an S3 or HDFS input source depending on the warehouse location.

Two important dependencies associated with Apache Iceberg tables are:

Catalog : This extension supports reading from either a Hive Metastore catalog or a Local file-based catalog. Support for AWS Glue is not available yet.
Warehouse : This extension supports reading data files from either HDFS or S3. Adapters for other cloud object locations should be easy to add by extending the AbstractInputSourceAdapter.
This commit is contained in:
Atul Mohan 2023-07-17 20:29:57 -07:00 committed by GitHub
parent 6becd8188e
commit 03d6d395a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 3049 additions and 3 deletions

View File

@ -633,6 +633,8 @@
<argument>org.apache.druid.extensions.contrib:aliyun-oss-extensions</argument> <argument>org.apache.druid.extensions.contrib:aliyun-oss-extensions</argument>
<argument>-c</argument> <argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:opentelemetry-emitter</argument> <argument>org.apache.druid.extensions.contrib:opentelemetry-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-iceberg-extensions</argument>
</arguments> </arguments>
</configuration> </configuration>
</execution> </execution>

View File

@ -0,0 +1,117 @@
---
id: iceberg
title: "Iceberg extension"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
## Iceberg Ingest extension
Apache Iceberg is an open table format for huge analytic datasets. [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) lets you ingest data stored in the Iceberg table format into Apache Druid. To use the iceberg extension, add the `druid-iceberg-extensions` to the list of loaded extensions. See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information.
Iceberg manages most of its metadata in metadata files in the object storage. However, it is still dependent on a metastore to manage a certain amount of metadata.
Iceberg refers to these metastores as catalogs. The Iceberg extension lets you connect to the following Iceberg catalog types:
* Hive metastore catalog
* Local catalog
Druid does not support AWS Glue and REST based catalogs yet.
For a given catalog, Iceberg input source reads the table name from the catalog, applies the filters, and extracts all the underlying live data files up to the latest snapshot.
The data files can be in Parquet, ORC, or Avro formats. The data files typically reside in a warehouse location, which can be in HDFS, S3, or the local filesystem.
The `druid-iceberg-extensions` extension relies on the existing input source connectors in Druid to read the data files from the warehouse. Therefore, the Iceberg input source can be considered as an intermediate input source, which provides the file paths for other input source implementations.
## Hive metastore catalog
For Druid to seamlessly talk to the Hive metastore, ensure that the Hive configuration files such as `hive-site.xml` and `core-site.xml` are available in the Druid classpath for peon processes.
You can also specify Hive properties under the `catalogProperties` object in the ingestion spec.
The `druid-iceberg-extensions` extension presently only supports HDFS, S3 and local warehouse directories.
### Read from HDFS warehouse
To read from a HDFS warehouse, load the `druid-hdfs-storage` extension. Druid extracts data file paths from the Hive metastore catalog and uses [HDFS input source](../../ingestion/input-sources.md#hdfs-input-source) to ingest these files.
The `warehouseSource` type in the ingestion spec should be `hdfs`.
For authenticating with Kerberized clusters, include `principal` and `keytab` properties in the `catalogProperties` object:
```json
"catalogProperties": {
"principal": "krb_principal",
"keytab": "/path/to/keytab"
}
```
Only Kerberos based authentication is supported as of now.
### Read from S3 warehouse
To read from a S3 warehouse, load the `druid-s3-extensions` extension. Druid extracts the data file paths from the Hive metastore catalog and uses `S3InputSource` to ingest these files.
Set the `type` property of the `warehouseSource` object to `s3` in the ingestion spec. If the S3 endpoint for the warehouse is different from the endpoint configured as the deep storage, include the following properties in the `warehouseSource` object to define the S3 endpoint settings:
```json
"warehouseSource": {
"type": "s3",
"endpointConfig": {
"url": "S3_ENDPOINT_URL",
"signingRegion": "us-east-1"
},
"clientConfig": {
"protocol": "http",
"disableChunkedEncoding": true,
"enablePathStyleAccess": true,
"forceGlobalBucketAccessEnabled": false
},
"properties": {
"accessKeyId": {
"type": "default",
"password": "<ACCESS_KEY_ID"
},
"secretAccessKey": {
"type": "default",
"password": "<SECRET_ACCESS_KEY>"
}
}
}
```
This extension uses the [Hadoop AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/) to connect to S3 and retrieve the metadata and data file paths.
The following properties are required in the `catalogProperties`:
```json
"catalogProperties": {
"fs.s3a.access.key" : "S3_ACCESS_KEY",
"fs.s3a.secret.key" : "S3_SECRET_KEY",
"fs.s3a.endpoint" : "S3_API_ENDPOINT"
}
```
Since the Hadoop AWS connector uses the `s3a` filesystem client, specify the warehouse path with the `s3a://` protocol instead of `s3://`.
## Local catalog
The local catalog type can be used for catalogs configured on the local filesystem. Set the `icebergCatalog` type to `local`. You can use this catalog for demos or localized tests. It is not recommended for production use cases.
The `warehouseSource` is set to `local` because this catalog only supports reading from a local filesystem.
## Known limitations
This section lists the known limitations that apply to the Iceberg extension.
- This extension does not fully utilize the Iceberg features such as snapshotting or schema evolution.
- The Iceberg input source reads every single live file on the Iceberg table up to the latest snapshot, which makes the table scan less performant. It is recommended to use Iceberg filters on partition columns in the ingestion spec in order to limit the number of data files being retrieved. Since, Druid doesn't store the last ingested iceberg snapshot ID, it cannot identify the files created between that snapshot and the latest snapshot on Iceberg.
- It does not handle Iceberg [schema evolution](https://iceberg.apache.org/docs/latest/evolution/) yet. In cases where an existing Iceberg table column is deleted and recreated with the same name, ingesting this table into Druid may bring the data for this column before it was deleted.
- The Hive catalog has not been tested on Hadoop 2.x.x and is not guaranteed to work with Hadoop 2.

View File

@ -794,6 +794,196 @@ The following is an example of a Combining input source spec:
... ...
``` ```
## Iceberg input source
> To use the Iceberg input source, add the `druid-iceberg-extensions` extension.
You use the Iceberg input source to read data stored in the Iceberg table format. For a given table, the input source scans up to the latest Iceberg snapshot from the configured Hive catalog. Druid ingests the underlying live data files using the existing input source formats.
The Iceberg input source cannot be independent as it relies on the existing input sources to read from the data files.
For example, if the warehouse associated with an Iceberg catalog is on S3, you must also load the [`druid-s3-extensions`](../development/extensions-core/s3.md) extension.
The following is a sample spec for a HDFS warehouse source:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "iceberg",
"tableName": "iceberg_table",
"namespace": "iceberg_namespace",
"icebergCatalog": {
"type": "hive",
"warehousePath": "hdfs://warehouse/path",
"catalogUri": "thrift://hive-metastore.x.com:8970",
"catalogProperties": {
"hive.metastore.connect.retries": "1",
"hive.metastore.execute.setugi": "false",
"hive.metastore.kerberos.principal": "KRB_PRINCIPAL",
"hive.metastore.sasl.enabled": "true",
"metastore.catalog.default": "catalog_test",
"hadoop.security.authentication": "kerberos",
"hadoop.security.authorization": "true"
}
},
"icebergFilter": {
"type": "interval",
"filterColumn": "event_time",
"intervals": [
"2023-05-10T19:00:00.000Z/2023-05-10T20:00:00.000Z"
]
},
"warehouseSource": {
"type": "hdfs"
}
},
"inputFormat": {
"type": "parquet"
}
},
...
},
...
```
The following is a sample spec for a S3 warehouse source:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "iceberg",
"tableName": "iceberg_table",
"namespace": "iceberg_namespace",
"icebergCatalog": {
"type": "hive",
"warehousePath": "hdfs://warehouse/path",
"catalogUri": "thrift://hive-metastore.x.com:8970",
"catalogProperties": {
"hive.metastore.connect.retries": "1",
"hive.metastore.execute.setugi": "false",
"hive.metastore.kerberos.principal": "KRB_PRINCIPAL",
"hive.metastore.sasl.enabled": "true",
"metastore.catalog.default": "default_catalog",
"fs.s3a.access.key" : "S3_ACCESS_KEY",
"fs.s3a.secret.key" : "S3_SECRET_KEY",
"fs.s3a.endpoint" : "S3_API_ENDPOINT"
}
},
"icebergFilter": {
"type": "interval",
"filterColumn": "event_time",
"intervals": [
"2023-05-10T19:00:00.000Z/2023-05-10T20:00:00.000Z"
]
},
"warehouseSource": {
"type": "s3",
"endpointConfig": {
"url": "teststore.aws.com",
"signingRegion": "us-west-2a"
},
"clientConfig": {
"protocol": "http",
"disableChunkedEncoding": true,
"enablePathStyleAccess": true,
"forceGlobalBucketAccessEnabled": false
},
"properties": {
"accessKeyId": {
"type": "default",
"password": "foo"
},
"secretAccessKey": {
"type": "default",
"password": "bar"
}
},
}
},
"inputFormat": {
"type": "parquet"
}
},
...
},
```
|Property|Description|Required|
|--------|-----------|---------|
|type|Set the value to `iceberg`.|yes|
|tableName|The Iceberg table name configured in the catalog.|yes|
|namespace|The Iceberg namespace associated with the table|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes|
###Catalog Object
The catalog object supports `local` and `hive` catalog types.
The following table lists the properties of a `local` catalog:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `local`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
The following table lists the properties of a `hive` catalog:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `hive`.|yes|
|warehousePath|The location of the warehouse associated with the catalog|yes|
|catalogUri|The URI associated with the hive catalog|yes|
|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no|
### Iceberg filter object
This input source provides the following filters: `and`, `equals`, `interval`, and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest.
`equals` Filter:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `equals`.|yes|
|filterColumn|The name of the column from the Iceberg table schema to use for filtering.|yes|
|filterValue|The value to filter on.|yes|
`interval` Filter:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `interval`.|yes|
|filterColumn|The column name from the iceberg table schema based on which filtering needs to happen|yes|
|intervals|A JSON array containing ISO 8601 interval strings. This defines the time ranges to filter on. The start interval is inclusive and the end interval is exclusive. |yes|
`and` Filter:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `and`.|yes|
|filters|List of iceberg filters that needs to be AND-ed|yes|
`or` Filter:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `or`.|yes|
|filters|List of iceberg filters that needs to be OR-ed|yes|
`not` Filter:
|Property|Description|Required|
|--------|-----------|---------|
|type|Set this value to `not`.|yes|
|filter|The iceberg filter on which logical NOT is applied|yes|
The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source.
Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method:
- `range` or `single_dim` partitioning: greater than or equal to 1 - `range` or `single_dim` partitioning: greater than or equal to 1

View File

@ -0,0 +1,360 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-iceberg-extensions</artifactId>
<name>druid-iceberg-extensions</name>
<description>druid-iceberg-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>org.apache.druid</groupId>
<version>27.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<!-- Following are excluded to remove security vulnerabilities: -->
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>3.1.3</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>runtime</scope>
<version>${hadoop.compile.version}</version>
<exclusions>
<exclusion>
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.inject</groupId>
<artifactId>javax</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.common;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import org.apache.druid.error.DruidException;
import org.apache.druid.iceberg.guice.HiveConf;
import org.apache.druid.iceberg.input.HiveIcebergCatalog;
import org.apache.druid.iceberg.input.IcebergInputSource;
import org.apache.druid.iceberg.input.LocalCatalog;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.util.Collections;
import java.util.List;
public class IcebergDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.singletonList(
new SimpleModule("IcebergDruidModule")
.registerSubtypes(
new NamedType(HiveIcebergCatalog.class, HiveIcebergCatalog.TYPE_KEY),
new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY),
new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY)
)
);
}
@Override
public void configure(Binder binder)
{
final Configuration conf = new Configuration();
conf.setClassLoader(getClass().getClassLoader());
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
FileSystem.get(conf);
}
catch (Exception ex) {
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build(ex, "Problem during fileSystem class level initialization");
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
binder.bind(Configuration.class).annotatedWith(HiveConf.class).toInstance(conf);
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
public class IcebergAndFilter implements IcebergFilter
{
private final List<IcebergFilter> filters;
private static final Logger log = new Logger(IcebergAndFilter.class);
@JsonCreator
public IcebergAndFilter(
@JsonProperty("filters") List<IcebergFilter> filters
)
{
Preconditions.checkArgument(filters != null && filters.size() > 0, "filter requires atleast one field");
this.filters = filters;
}
@JsonProperty
public List<IcebergFilter> getFilters()
{
return filters;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
List<Expression> expressions = new ArrayList<>();
LinkedHashSet<IcebergFilter> flatFilters = flattenAndChildren(filters);
for (IcebergFilter filter : flatFilters) {
expressions.add(filter.getFilterExpression());
}
Expression finalExpr = Expressions.alwaysTrue();
for (Expression expr : expressions) {
finalExpr = Expressions.and(finalExpr, expr);
}
return finalExpr;
}
private static LinkedHashSet<IcebergFilter> flattenAndChildren(final Collection<IcebergFilter> filters)
{
final LinkedHashSet<IcebergFilter> retVal = new LinkedHashSet<>();
for (IcebergFilter child : filters) {
if (child instanceof IcebergAndFilter) {
retVal.addAll(flattenAndChildren(((IcebergAndFilter) child).getFilters()));
} else {
retVal.add(child);
}
}
return retVal;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
public class IcebergEqualsFilter implements IcebergFilter
{
@JsonProperty
private final String filterColumn;
@JsonProperty
private final String filterValue;
@JsonCreator
public IcebergEqualsFilter(
@JsonProperty("filterColumn") String filterColumn,
@JsonProperty("filterValue") String filterValue
)
{
Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the equals filter");
Preconditions.checkNotNull(filterValue, "You must specify a filter value on the equals filter");
this.filterColumn = filterColumn;
this.filterValue = filterValue;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
return Expressions.equal(
filterColumn,
filterValue
);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
/**
* Interface to manage iceberg expressions which can be used to perform filtering on the iceberg table
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "interval", value = IcebergIntervalFilter.class),
@JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class),
@JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
@JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class)
})
public interface IcebergFilter
{
TableScan filter(TableScan tableScan);
Expression getFilterExpression();
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
public class IcebergIntervalFilter implements IcebergFilter
{
@JsonProperty
private final String filterColumn;
@JsonProperty
private final List<Interval> intervals;
@JsonCreator
public IcebergIntervalFilter(
@JsonProperty("filterColumn") String filterColumn,
@JsonProperty("intervals") List<Interval> intervals
)
{
Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the interval filter");
Preconditions.checkArgument(intervals != null && intervals.size() > 0, "You must specify intervals on the interval filter");
this.filterColumn = filterColumn;
this.intervals = intervals;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
List<Expression> expressions = new ArrayList<>();
for (Interval filterInterval : intervals) {
// Converts the input timestamp string into iceberg TimestampType because TimestampType supports microsecond precision.
// This is to ensure that there are no precision mismatches when doing the comparison.
Long dateStart = (long) Literal.of(filterInterval.getStart().toString())
.to(Types.TimestampType.withZone())
.value();
Long dateEnd = (long) Literal.of(filterInterval.getEnd().toString())
.to(Types.TimestampType.withZone())
.value();
expressions.add(Expressions.and(
Expressions.greaterThanOrEqual(
filterColumn,
dateStart
),
Expressions.lessThan(
filterColumn,
dateEnd
)
));
}
Expression finalExpr = Expressions.alwaysFalse();
for (Expression filterExpr : expressions) {
finalExpr = Expressions.or(finalExpr, filterExpr);
}
return finalExpr;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
public class IcebergNotFilter implements IcebergFilter
{
private final IcebergFilter filter;
@JsonCreator
public IcebergNotFilter(
@JsonProperty("filter") IcebergFilter filter
)
{
Preconditions.checkNotNull(filter, "You must specify an iceberg filter");
this.filter = filter;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
return Expressions.not(filter.getFilterExpression());
}
@JsonProperty
public IcebergFilter getFilter()
{
return filter;
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
public class IcebergOrFilter implements IcebergFilter
{
private final List<IcebergFilter> filters;
private static final Logger log = new Logger(IcebergAndFilter.class);
@JsonCreator
public IcebergOrFilter(
@JsonProperty("filters") List<IcebergFilter> filters
)
{
Preconditions.checkArgument(filters != null && filters.size() > 0, "filter requires atleast one field");
this.filters = filters;
}
@JsonProperty
public List<IcebergFilter> getFilters()
{
return filters;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
List<Expression> expressions = new ArrayList<>();
LinkedHashSet<IcebergFilter> flatFilters = flattenOrChildren(filters);
for (IcebergFilter filter : flatFilters) {
expressions.add(filter.getFilterExpression());
}
Expression finalExpr = Expressions.alwaysFalse();
for (Expression expr : expressions) {
finalExpr = Expressions.or(finalExpr, expr);
}
return finalExpr;
}
private static LinkedHashSet<IcebergFilter> flattenOrChildren(final Collection<IcebergFilter> filters)
{
final LinkedHashSet<IcebergFilter> retVal = new LinkedHashSet<>();
for (IcebergFilter child : filters) {
if (child instanceof IcebergOrFilter) {
retVal.addAll(flattenOrChildren(((IcebergOrFilter) child).getFilters()));
} else {
retVal.add(child);
}
}
return retVal;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.guice;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Binding annotation for implementations specific to the iceberg extension module.
* This is required as there can be overlap of classes bound by this extension and the hdfs-storage extension.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface HiveConf
{
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.iceberg.guice.HiveConf;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
/**
* Hive Metastore specific implementation of iceberg catalog.
* Kerberos authentication is performed if the credentials are provided in the catalog properties
*/
public class HiveIcebergCatalog extends IcebergCatalog
{
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
public static final String TYPE_KEY = "hive";
@JsonProperty
private String warehousePath;
@JsonProperty
private String catalogUri;
@JsonProperty
private Map<String, String> catalogProperties;
private final Configuration configuration;
private BaseMetastoreCatalog hiveCatalog;
private static final Logger log = new Logger(HiveIcebergCatalog.class);
@JsonCreator
public HiveIcebergCatalog(
@JsonProperty("warehousePath") String warehousePath,
@JsonProperty("catalogUri") String catalogUri,
@JsonProperty("catalogProperties") @Nullable
Map<String, Object> catalogProperties,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject @HiveConf Configuration configuration
)
{
this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null");
this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null");
this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper);
this.configuration = configuration;
this.catalogProperties
.forEach(this.configuration::set);
this.hiveCatalog = retrieveCatalog();
}
@Override
public BaseMetastoreCatalog retrieveCatalog()
{
if (hiveCatalog == null) {
hiveCatalog = setupCatalog();
}
return hiveCatalog;
}
private HiveCatalog setupCatalog()
{
HiveCatalog catalog = new HiveCatalog();
authenticate();
catalog.setConf(configuration);
catalogProperties.put("warehouse", warehousePath);
catalogProperties.put("uri", catalogUri);
catalog.initialize("hive", catalogProperties);
return catalog;
}
private void authenticate()
{
String principal = catalogProperties.getOrDefault("principal", null);
String keytab = catalogProperties.getOrDefault("keytab", null);
if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) {
UserGroupInformation.setConfiguration(configuration);
if (UserGroupInformation.isSecurityEnabled()) {
try {
if (UserGroupInformation.getCurrentUser().hasKerberosCredentials() == false
|| !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) {
log.info("Hive trying to authenticate user [%s] with keytab [%s]..", principal, keytab);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
}
catch (IOException e) {
throw new ISE(e, "Failed to authenticate user principal [%s] with keytab [%s]", principal, keytab);
}
}
}
}
public String getWarehousePath()
{
return warehousePath;
}
public String getCatalogUri()
{
return catalogUri;
}
public Map<String, String> getCatalogProperties()
{
return catalogProperties;
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import java.util.ArrayList;
import java.util.List;
/*
* Druid wrapper for an iceberg catalog.
* The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot.
* This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY)
public abstract class IcebergCatalog
{
private static final Logger log = new Logger(IcebergCatalog.class);
public abstract BaseMetastoreCatalog retrieveCatalog();
/**
* Extract the iceberg data files upto the latest snapshot associated with the table
*
* @param tableNamespace The catalog namespace under which the table is defined
* @param tableName The iceberg table name
* @return a list of data file paths
*/
public List<String> extractSnapshotDataFiles(
String tableNamespace,
String tableName,
IcebergFilter icebergFilter
)
{
Catalog catalog = retrieveCatalog();
Namespace namespace = Namespace.of(tableNamespace);
String tableIdentifier = tableNamespace + "." + tableName;
List<String> dataFilePaths = new ArrayList<>();
ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream()
.filter(tableId -> tableId.toString().equals(tableIdentifier))
.findFirst()
.orElseThrow(() -> new IAE(
" Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog",
tableIdentifier
));
long start = System.currentTimeMillis();
TableScan tableScan = catalog.loadTable(icebergTableIdentifier).newScan();
if (icebergFilter != null) {
tableScan = icebergFilter.filter(tableScan);
}
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
long duration = System.currentTimeMillis() - start;
log.info("Data file scan and fetch took [%d ms] time for [%d] paths", duration, dataFilePaths.size());
}
catch (Exception e) {
throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxClassloader);
}
return dataFilePaths;
}
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.iceberg.filter.IcebergFilter;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
/**
* Inputsource to ingest data managed by the Iceberg table format.
* This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table.
* The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined.
*/
public class IcebergInputSource implements SplittableInputSource<List<String>>
{
public static final String TYPE_KEY = "iceberg";
@JsonProperty
private final String tableName;
@JsonProperty
private final String namespace;
@JsonProperty
private IcebergCatalog icebergCatalog;
@JsonProperty
private IcebergFilter icebergFilter;
@JsonProperty
private AbstractInputSourceBuilder warehouseSource;
private boolean isLoaded = false;
private SplittableInputSource delegateInputSource;
@JsonCreator
public IcebergInputSource(
@JsonProperty("tableName") String tableName,
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") AbstractInputSourceBuilder warehouseSource
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
this.icebergFilter = icebergFilter;
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
if (!isLoaded) {
retrieveIcebergDatafiles();
}
return getDelegateInputSource().reader(inputRowSchema, inputFormat, temporaryDirectory);
}
@Override
public Stream<InputSplit<List<String>>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
) throws IOException
{
if (!isLoaded) {
retrieveIcebergDatafiles();
}
return getDelegateInputSource().createSplits(inputFormat, splitHintSpec);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
if (!isLoaded) {
retrieveIcebergDatafiles();
}
return getDelegateInputSource().estimateNumSplits(inputFormat, splitHintSpec);
}
@Override
public InputSource withSplit(InputSplit<List<String>> inputSplit)
{
return getDelegateInputSource().withSplit(inputSplit);
}
@Override
public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHintSpec)
{
return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec);
}
@JsonProperty
public String getTableName()
{
return tableName;
}
@JsonProperty
public String getNamespace()
{
return namespace;
}
@JsonProperty
public IcebergCatalog getIcebergCatalog()
{
return icebergCatalog;
}
@JsonProperty
public IcebergFilter getIcebergFilter()
{
return icebergFilter;
}
public SplittableInputSource getDelegateInputSource()
{
return delegateInputSource;
}
protected void retrieveIcebergDatafiles()
{
List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter()
);
delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles);
isLoaded = true;
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
/**
* Iceberg catalog implementation that handles file based catalogs created in the local filesystem.
*/
public class LocalCatalog extends IcebergCatalog
{
public static final String TYPE_KEY = "local";
@JsonProperty
private final String warehousePath;
@JsonProperty
private final Map<String, String> catalogProperties;
private BaseMetastoreCatalog catalog;
@JsonCreator
public LocalCatalog(
@JsonProperty("warehousePath") String warehousePath,
@JsonProperty("catalogProperties") @Nullable
Map<String, String> catalogProperties
)
{
Preconditions.checkNotNull(warehousePath, "warehousePath is null");
this.warehousePath = warehousePath;
this.catalogProperties = catalogProperties;
this.catalog = retrieveCatalog();
}
@JsonProperty
public String getWarehousePath()
{
return warehousePath;
}
@JsonProperty
public Map<String, String> getCatalogProperties()
{
return catalogProperties;
}
@Override
public BaseMetastoreCatalog retrieveCatalog()
{
if (catalog == null) {
catalog = setupCatalog();
}
return catalog;
}
private HadoopCatalog setupCatalog()
{
HadoopCatalog hadoopCatalog = new HadoopCatalog();
hadoopCatalog.setConf(new Configuration());
catalogProperties.put("warehouse", warehousePath);
hadoopCatalog.initialize("hadoop", catalogProperties);
return hadoopCatalog;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LocalCatalog that = (LocalCatalog) o;
return warehousePath.equals(that.warehousePath)
&& Objects.equals(catalogProperties, that.catalogProperties);
}
@Override
public int hashCode()
{
return Objects.hash(warehousePath, catalogProperties);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.iceberg.common.IcebergDruidModule

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class IcebergAndFilterTest
{
private final String INTERVAL_COLUMN = "eventTime";
private final String COLUMN1 = "column1";
private final String COLUMN2 = "column2";
private final Expression equalExpression1 = Expressions.equal(COLUMN1, "value1");
private final Expression equalExpression2 = Expressions.equal(COLUMN2, "value2");
private final Expression intervalExpression = Expressions.and(
Expressions.greaterThanOrEqual(
INTERVAL_COLUMN,
Literal.of("2022-01-01T00:00:00.000Z")
.to(Types.TimestampType.withZone())
.value()
),
Expressions.lessThan(
INTERVAL_COLUMN,
Literal.of("2022-01-02T00:00:00.000Z")
.to(Types.TimestampType.withZone())
.value()
)
);
@Test
public void testFilter()
{
IcebergAndFilter andFilter = new IcebergAndFilter(Arrays.asList(
new IcebergEqualsFilter(COLUMN1, "value1"),
new IcebergEqualsFilter(COLUMN2, "value2")
));
Expression expectedExpression = Expressions.and(equalExpression1, equalExpression2);
Assert.assertEquals(expectedExpression.toString(), andFilter.getFilterExpression().toString());
}
@Test
public void testEmptyFilter()
{
Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(null));
Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(Collections.emptyList()));
}
@Test
public void testNestedFilter()
{
IcebergAndFilter andFilter = new IcebergAndFilter(
Arrays.asList(
new IcebergAndFilter(
Arrays.asList(
new IcebergEqualsFilter(COLUMN1, "value1"),
new IcebergEqualsFilter(COLUMN2, "value2")
)),
new IcebergIntervalFilter(
INTERVAL_COLUMN,
Collections.singletonList(Intervals.of(
"2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z"))
)
));
Expression expectedExpression = Expressions.and(
Expressions.and(equalExpression1, equalExpression2),
intervalExpression
);
Assert.assertEquals(expectedExpression.toString(), andFilter.getFilterExpression().toString());
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.junit.Assert;
import org.junit.Test;
public class IcebergEqualsFilterTest
{
@Test
public void testFilter()
{
IcebergEqualsFilter testFilter = new IcebergEqualsFilter("column1", "value1");
Expression expectedExpression = Expressions.equal("column1", "value1");
Assert.assertEquals(expectedExpression.toString(), testFilter.getFilterExpression().toString());
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class IcebergIntervalFilterTest
{
@Test
public void testFilter()
{
String intervalColumn = "eventTime";
List<Interval> intervals = Arrays.asList(
Intervals.of("2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z"),
Intervals.of("2023-01-01T00:00:00.000Z/2023-01-01T00:00:00.000Z")
);
IcebergIntervalFilter intervalFilter = new IcebergIntervalFilter(intervalColumn, intervals);
Expression expectedExpression = Expressions.or(
Expressions.and(
Expressions.greaterThanOrEqual(
intervalColumn,
Literal.of(intervals.get(0).getStart().toString())
.to(Types.TimestampType.withZone())
.value()
),
Expressions.lessThan(
intervalColumn,
Literal.of(intervals.get(0).getEnd().toString())
.to(Types.TimestampType.withZone())
.value()
)
),
Expressions.and(
Expressions.greaterThanOrEqual(
intervalColumn,
Literal.of(intervals.get(1).getStart().toString())
.to(Types.TimestampType.withZone())
.value()
),
Expressions.lessThan(
intervalColumn,
Literal.of(intervals.get(1).getEnd().toString())
.to(Types.TimestampType.withZone())
.value()
)
)
);
Assert.assertEquals(expectedExpression.toString(), intervalFilter.getFilterExpression().toString());
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class IcebergNotFilterTest
{
@Test
public void testFilter()
{
IcebergNotFilter testFilter = new IcebergNotFilter(new IcebergEqualsFilter("column1", "value1"));
Expression expectedExpression = Expressions.not(Expressions.equal("column1", "value1"));
Assert.assertEquals(expectedExpression.toString(), testFilter.getFilterExpression().toString());
}
@Test
public void testNestedFilters()
{
String column1 = "column1";
String column2 = "column2";
IcebergNotFilter filterNotAnd = new IcebergNotFilter(
new IcebergAndFilter(Arrays.asList(
new IcebergEqualsFilter(
column1,
"value1"
),
new IcebergEqualsFilter(
column2,
"value2"
)
))
);
Expression expressionNotAnd = Expressions.not(Expressions.and(
Expressions.equal(column1, "value1"),
Expressions.equal(column2, "value2")
));
IcebergNotFilter filterNotOr = new IcebergNotFilter(
new IcebergOrFilter(Arrays.asList(
new IcebergEqualsFilter(
column1,
"value1"
),
new IcebergEqualsFilter(
column2,
"value2"
)
))
);
Expression expressionNotOr = Expressions.not(Expressions.or(
Expressions.equal(column1, "value1"),
Expressions.equal(column2, "value2")
));
Assert.assertEquals(expressionNotAnd.toString(), filterNotAnd.getFilterExpression().toString());
Assert.assertEquals(expressionNotOr.toString(), filterNotOr.getFilterExpression().toString());
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class IcebergOrFilterTest
{
private final String INTERVAL_COLUMN = "eventTime";
private final String COLUMN1 = "column1";
private final String COLUMN2 = "column2";
private final Expression equalExpression1 = Expressions.equal(COLUMN1, "value1");
private final Expression equalExpression2 = Expressions.equal(COLUMN2, "value2");
private final Expression intervalExpression = Expressions.and(
Expressions.greaterThanOrEqual(
INTERVAL_COLUMN,
Literal.of("2022-01-01T00:00:00.000Z")
.to(Types.TimestampType.withZone())
.value()
),
Expressions.lessThan(
INTERVAL_COLUMN,
Literal.of("2022-01-02T00:00:00.000Z")
.to(Types.TimestampType.withZone())
.value()
)
);
@Test
public void testFilter()
{
IcebergOrFilter orFilter = new IcebergOrFilter(Arrays.asList(
new IcebergEqualsFilter(COLUMN1, "value1"),
new IcebergEqualsFilter(COLUMN2, "value2")
));
Expression expectedExpression = Expressions.or(equalExpression1, equalExpression2);
Assert.assertEquals(expectedExpression.toString(), orFilter.getFilterExpression().toString());
}
@Test
public void testEmptyFilter()
{
Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(null));
Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(Collections.emptyList()));
}
@Test
public void testNestedFilters()
{
IcebergOrFilter filterOrOr = new IcebergOrFilter(
Arrays.asList(
new IcebergOrFilter(
Arrays.asList(
new IcebergEqualsFilter(COLUMN1, "value1"),
new IcebergEqualsFilter(COLUMN2, "value2")
)),
new IcebergIntervalFilter(
INTERVAL_COLUMN,
Collections.singletonList(Intervals.of(
"2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z"))
)
));
Expression expectedExpressionOrOr = Expressions.or(
Expressions.or(equalExpression1, equalExpression2),
intervalExpression
);
IcebergOrFilter filterOrAnd = new IcebergOrFilter(
Arrays.asList(
new IcebergAndFilter(
Arrays.asList(
new IcebergEqualsFilter(COLUMN1, "value1"),
new IcebergEqualsFilter(COLUMN2, "value2")
)),
new IcebergIntervalFilter(
INTERVAL_COLUMN,
Collections.singletonList(Intervals.of(
"2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z"))
)
));
Expression expectedExpressionOrAnd = Expressions.or(
Expressions.and(equalExpression1, equalExpression2),
intervalExpression
);
Assert.assertEquals(expectedExpressionOrOr.toString(), filterOrOr.getFilterExpression().toString());
Assert.assertEquals(expectedExpressionOrAnd.toString(), filterOrAnd.getFilterExpression().toString());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.HashMap;
public class HiveIcebergCatalogTest
{
private final ObjectMapper mapper = new DefaultObjectMapper();
@Test
public void testCatalogCreate()
{
final File warehouseDir = FileUtils.createTempDir();
HiveIcebergCatalog hiveCatalog = new HiveIcebergCatalog(
warehouseDir.getPath(),
"hdfs://testuri",
new HashMap<>(),
mapper,
new Configuration()
);
HiveIcebergCatalog hiveCatalogNullProps = new HiveIcebergCatalog(
warehouseDir.getPath(),
"hdfs://testuri",
null,
mapper,
new Configuration()
);
Assert.assertEquals("hive", hiveCatalog.retrieveCatalog().name());
Assert.assertEquals(2, hiveCatalogNullProps.getCatalogProperties().size());
}
@Test
public void testAuthenticate()
{
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"}));
final File warehouseDir = FileUtils.createTempDir();
HashMap<String, Object> catalogMap = new HashMap<>();
catalogMap.put("principal", "test");
catalogMap.put("keytab", "test");
HiveIcebergCatalog hiveCatalog = new HiveIcebergCatalog(
warehouseDir.getPath(),
"hdfs://testuri",
catalogMap,
mapper,
new Configuration()
);
Assert.assertEquals("hdfs://testuri", hiveCatalog.getCatalogUri());
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class IcebergInputSourceTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
IcebergCatalog testCatalog;
Schema tableSchema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "name", Types.StringType.get())
);
Map<String, Object> tableData = ImmutableMap.of("id", "123988", "name", "Foo");
private static final String NAMESPACE = "default";
private static final String TABLENAME = "foosTable";
@Test
public void testInputSource() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
createAndLoadTable(tableIdentifier);
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceBuilder()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
.map(inpSource -> (LocalInputSource) inpSource)
.map(LocalInputSource::getFiles)
.flatMap(List::stream)
.collect(Collectors.toList());
Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null)));
Assert.assertEquals(1, localInputSourceList.size());
CloseableIterable<Record> datafileReader = Parquet.read(Files.localInput(localInputSourceList.get(0)))
.project(tableSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
tableSchema,
fileSchema
))
.build();
for (Record record : datafileReader) {
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
dropTableFromCatalog(tableIdentifier);
}
@Test
public void testInputSourceWithFilter() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
createAndLoadTable(tableIdentifier);
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "0000"),
testCatalog,
new LocalInputSourceBuilder()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
.map(inpSource -> (LocalInputSource) inpSource)
.map(LocalInputSource::getFiles)
.flatMap(List::stream)
.collect(Collectors.toList());
Assert.assertEquals(0, localInputSourceList.size());
dropTableFromCatalog(tableIdentifier);
}
private void createAndLoadTable(TableIdentifier tableIdentifier) throws IOException
{
//Setup iceberg table and schema
Table icebergTableFromSchema = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema);
//Generate an iceberg record and write it to a file
GenericRecord record = GenericRecord.create(tableSchema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(record.copy(tableData));
String filepath = icebergTableFromSchema.location() + "/" + UUID.randomUUID();
OutputFile file = icebergTableFromSchema.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
Parquet.writeData(file)
.schema(tableSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try {
for (GenericRecord genRecord : builder.build()) {
dataWriter.write(genRecord);
}
}
finally {
dataWriter.close();
}
DataFile dataFile = dataWriter.toDataFile();
//Add the data file to the iceberg table
icebergTableFromSchema.newAppend().appendFile(dataFile).commit();
}
private void dropTableFromCatalog(TableIdentifier tableIdentifier)
{
testCatalog.retrieveCatalog().dropTable(tableIdentifier);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.HashMap;
public class LocalCatalogTest
{
@Test
public void testCatalogSerDe() throws JsonProcessingException
{
final File warehouseDir = FileUtils.createTempDir();
DefaultObjectMapper mapper = new DefaultObjectMapper();
LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
LocalCatalog after = mapper.readValue(
mapper.writeValueAsString(before), LocalCatalog.class);
Assert.assertEquals(before, after);
Assert.assertEquals("hadoop", before.retrieveCatalog().name());
Assert.assertEquals("hadoop", after.retrieveCatalog().name());
}
@Test
public void testEqualsContract()
{
EqualsVerifier.forClass(LocalCatalog.class).withNonnullFields("warehousePath").withIgnoredFields("catalog").usingGetClass().verify();
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
{
private final Configuration configuration;
private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator
public HdfsInputSourceBuilder(
@JacksonInject @Hdfs Configuration configuration,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
)
{
this.configuration = configuration;
this.inputSourceConfig = inputSourceConfig;
}
@Override
public SplittableInputSource generateInputSource(List<String> inputFilePaths)
{
return new HdfsInputSource(inputFilePaths, configuration, inputSourceConfig);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
@ -65,7 +66,8 @@ public class HdfsStorageDruidModule implements DruidModule
return Collections.singletonList( return Collections.singletonList(
new SimpleModule().registerSubtypes( new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME), new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME) new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSourceBuilder.class, HdfsStorageDruidModule.SCHEME)
) )
); );
} }

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class HdfsInputSourceAdapterTest
{
@Test
public void testAdapterGet()
{
Configuration conf = new Configuration();
HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
HdfsInputSourceBuilder hdfsInputSourceAdapter = new HdfsInputSourceBuilder(conf, inputSourceConfig);
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
public class S3InputSourceBuilder extends AbstractInputSourceBuilder
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
private final S3InputSourceConfig s3InputSourceConfig;
private final S3InputDataConfig inputDataConfig;
private final AWSCredentialsProvider awsCredentialsProvider;
private final AWSProxyConfig awsProxyConfig;
private final AWSClientConfig awsClientConfig;
private final AWSEndpointConfig awsEndpointConfig;
@JsonCreator
public S3InputSourceBuilder(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
@JacksonInject S3InputDataConfig inputDataConfig,
@JacksonInject AWSCredentialsProvider awsCredentialsProvider,
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
@JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig,
@JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
)
{
this.s3Client = s3Client;
this.s3ClientBuilder = s3ClientBuilder;
this.inputDataConfig = inputDataConfig;
this.awsCredentialsProvider = awsCredentialsProvider;
this.s3InputSourceConfig = s3InputSourceConfig;
this.awsProxyConfig = awsProxyConfig;
this.awsEndpointConfig = awsEndpointConfig;
this.awsClientConfig = awsClientConfig;
}
@Override
public SplittableInputSource generateInputSource(List<String> inputFilePaths)
{
return new S3InputSource(
s3Client,
s3ClientBuilder,
inputDataConfig,
awsCredentialsProvider,
inputFilePaths.stream().map(chosenPath -> {
try {
return new URI(StringUtils.replace(chosenPath, "s3a://", "s3://"));
}
catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}).collect(
Collectors.toList()),
null,
null,
null,
s3InputSourceConfig,
awsProxyConfig,
awsEndpointConfig,
awsClientConfig
);
}
@Nullable
@JsonProperty("properties")
@JsonInclude(JsonInclude.Include.NON_NULL)
public S3InputSourceConfig getS3InputSourceConfig()
{
return s3InputSourceConfig;
}
@Nullable
@JsonProperty("proxyConfig")
@JsonInclude(JsonInclude.Include.NON_NULL)
public AWSProxyConfig getAwsProxyConfig()
{
return awsProxyConfig;
}
@Nullable
@JsonProperty("clientConfig")
@JsonInclude(JsonInclude.Include.NON_NULL)
public AWSClientConfig getAwsClientConfig()
{
return awsClientConfig;
}
@Nullable
@JsonProperty("endpointConfig")
@JsonInclude(JsonInclude.Include.NON_NULL)
public AWSEndpointConfig getAwsEndpointConfig()
{
return awsEndpointConfig;
}
}

View File

@ -38,7 +38,10 @@ public class S3InputSourceDruidModule implements DruidModule
public List<? extends Module> getJacksonModules() public List<? extends Module> getJacksonModules()
{ {
return ImmutableList.of( return ImmutableList.of(
new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME)) new SimpleModule().registerSubtypes(
new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME),
new NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME)
)
); );
} }

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.s3;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class S3InputSourceBuilderTest
{
@Test
public void testAdapterGet()
{
ServerSideEncryptingAmazonS3.Builder serverSides3Builder =
EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
ServerSideEncryptingAmazonS3 service = EasyMock.createMock(ServerSideEncryptingAmazonS3.class);
S3InputDataConfig dataConfig = EasyMock.createMock(S3InputDataConfig.class);
List<String> fileUris = Arrays.asList(
"s3://foo/bar/file.csv",
"s3://bar/foo/file2.csv",
"s3://bar/foo/file3.txt"
);
S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
service,
serverSides3Builder,
dataConfig,
null,
null,
null,
null,
null
);
Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof S3InputSource);
}
}

View File

@ -221,6 +221,7 @@
<module>extensions-contrib/prometheus-emitter</module> <module>extensions-contrib/prometheus-emitter</module>
<module>extensions-contrib/opentelemetry-emitter</module> <module>extensions-contrib/opentelemetry-emitter</module>
<module>extensions-contrib/kubernetes-overlord-extensions</module> <module>extensions-contrib/kubernetes-overlord-extensions</module>
<module>extensions-contrib/druid-iceberg-extensions</module>
<!-- distribution packaging --> <!-- distribution packaging -->
<module>distribution</module> <module>distribution</module>
<!-- Revised integration tests --> <!-- Revised integration tests -->
@ -259,7 +260,6 @@
</snapshots> </snapshots>
</pluginRepository> </pluginRepository>
</pluginRepositories> </pluginRepositories>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<!-- Compile Scope --> <!-- Compile Scope -->

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
/**
* A wrapper on top of {@link SplittableInputSource} that handles input source creation.
* For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths
* are not available yet and this might fail the input source precondition checks.
* This adapter helps create the delegate input source once the input file paths are fully determined.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = LocalInputSourceBuilder.class)
})
public abstract class AbstractInputSourceBuilder
{
public abstract SplittableInputSource generateInputSource(List<String> inputFilePaths);
public SplittableInputSource setupInputSource(List<String> inputFilePaths)
{
if (inputFilePaths.isEmpty()) {
return new EmptyInputSource();
} else {
return generateInputSource(inputFilePaths);
}
}
/**
* This input source is used in place of a delegate input source if there are no input file paths.
* Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource
* may use this input source as delegate in such cases.
*/
private static class EmptyInputSource implements SplittableInputSource
{
@Override
public boolean needsFormat()
{
return false;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
)
{
return new InputSourceReader()
{
@Override
public CloseableIterator<InputRow> read(InputStats inputStats)
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample()
{
return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
});
}
};
}
@Override
public Stream<InputSplit> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
return Stream.empty();
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return 0;
}
@Override
public InputSource withSplit(InputSplit split)
{
return null;
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.AbstractInputSourceBuilder;
import java.io.File;
import java.util.List;
import java.util.stream.Collectors;
public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
{
public static final String TYPE_KEY = "local";
@Override
public LocalInputSource generateInputSource(List<String> inputFilePaths)
{
return new LocalInputSource(
null,
null,
inputFilePaths.stream().map(chosenPath -> new File(chosenPath)).collect(
Collectors.toList())
);
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class LocalInputSourceAdapterTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testAdapterGet()
{
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
"foo.parquet",
"bar.parquet"
)) instanceof LocalInputSource);
}
@Test
public void testAdapterSetup()
{
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
InputSource delegateInputSource = localInputSourceAdapter.setupInputSource(Arrays.asList(
"foo.parquet",
"bar.parquet"
));
Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
}
@Test
public void testEmptyInputSource() throws IOException
{
InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder();
SplittableInputSource<Object> emptyInputSource =
(SplittableInputSource<Object>) localInputSourceAdapter.setupInputSource(Collections.emptyList());
List<InputSplit<Object>> splitList = emptyInputSource
.createSplits(mockFormat, mockSplitHint)
.collect(Collectors.toList());
Assert.assertTrue(splitList.isEmpty());
Assert.assertFalse(emptyInputSource.isSplittable());
Assert.assertFalse(emptyInputSource.needsFormat());
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, mockSplitHint));
Assert.assertFalse(emptyInputSource.reader(
EasyMock.createMock(InputRowSchema.class),
mockFormat,
temporaryFolder.newFolder()
).read().hasNext());
}
}

View File

@ -109,6 +109,8 @@ html
HyperLogLog HyperLogLog
IAM IAM
IANA IANA
IcebergFilter
IcebergInputSource
IETF IETF
IP IP
IPv4 IPv4
@ -312,6 +314,8 @@ featureSpec
findColumnsFromHeader findColumnsFromHeader
filenames filenames
filesystem filesystem
filterColumn
filterValue
firefox firefox
firehose firehose
firehoses firehoses
@ -375,6 +379,7 @@ max_map_count
memcached memcached
mergeable mergeable
metadata metadata
metastores
millis millis
microbatch microbatch
microbatches microbatches
@ -483,6 +488,7 @@ skipHeaderRows
Smoosh Smoosh
smoosh smoosh
smooshed smooshed
snapshotting
splittable splittable
ssl ssl
sslmode sslmode
@ -1226,6 +1232,8 @@ pom.xml
0.7.x 0.7.x
0.7.x. 0.7.x.
TimeAndDims TimeAndDims
catalogProperties
catalogUri
column2 column2
column_1 column_1
column_n column_n
@ -1234,6 +1242,8 @@ ctrl
descriptorString descriptorString
headerFormat headerFormat
headerLabelPrefix headerLabelPrefix
icebergFilter
icebergCatalog
jsonLowercase jsonLowercase
kafka kafka
KafkaStringHeaderFormat KafkaStringHeaderFormat
@ -1306,6 +1316,7 @@ segmentOutputPath
segmentTable segmentTable
shardSpec shardSpec
single_dim single_dim
tableName
targetPartitionSize targetPartitionSize
targetRowsPerSegment targetRowsPerSegment
useCombiner useCombiner
@ -1398,6 +1409,8 @@ BUILD_SEGMENTS
DETERMINE_PARTITIONS DETERMINE_PARTITIONS
forceTimeChunkLock forceTimeChunkLock
taskLockTimeout taskLockTimeout
warehouseSource
warehousePath
index.md index.md
DOUBLE_ARRAY DOUBLE_ARRAY
DOY DOY