From 03d6d395a0ae7b03743dec3a0e447648d8aca126 Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Mon, 17 Jul 2023 20:29:57 -0700 Subject: [PATCH] Extension to read and ingest iceberg data files (#14329) This adds a new contrib extension: druid-iceberg-extensions which can be used to ingest data stored in Apache Iceberg format. It adds a new input source of type iceberg that connects to a catalog and retrieves the data files associated with an iceberg table and provides these data file paths to either an S3 or HDFS input source depending on the warehouse location. Two important dependencies associated with Apache Iceberg tables are: Catalog : This extension supports reading from either a Hive Metastore catalog or a Local file-based catalog. Support for AWS Glue is not available yet. Warehouse : This extension supports reading data files from either HDFS or S3. Adapters for other cloud object locations should be easy to add by extending the AbstractInputSourceAdapter. --- distribution/pom.xml | 2 + .../development/extensions-contrib/iceberg.md | 117 ++++++ docs/ingestion/input-sources.md | 190 +++++++++ .../druid-iceberg-extensions/pom.xml | 360 ++++++++++++++++++ .../iceberg/common/IcebergDruidModule.java | 78 ++++ .../iceberg/filter/IcebergAndFilter.java | 91 +++++ .../iceberg/filter/IcebergEqualsFilter.java | 65 ++++ .../druid/iceberg/filter/IcebergFilter.java | 43 +++ .../iceberg/filter/IcebergIntervalFilter.java | 92 +++++ .../iceberg/filter/IcebergNotFilter.java | 60 +++ .../druid/iceberg/filter/IcebergOrFilter.java | 91 +++++ .../apache/druid/iceberg/guice/HiveConf.java | 38 ++ .../iceberg/input/HiveIcebergCatalog.java | 140 +++++++ .../druid/iceberg/input/IcebergCatalog.java | 104 +++++ .../iceberg/input/IcebergInputSource.java | 176 +++++++++ .../druid/iceberg/input/LocalCatalog.java | 111 ++++++ ...rg.apache.druid.initialization.DruidModule | 16 + .../iceberg/filter/IcebergAndFilterTest.java | 96 +++++ .../filter/IcebergEqualsFilterTest.java | 36 ++ .../filter/IcebergIntervalFilterTest.java | 78 ++++ .../iceberg/filter/IcebergNotFilterTest.java | 82 ++++ .../iceberg/filter/IcebergOrFilterTest.java | 116 ++++++ .../iceberg/input/HiveIcebergCatalogTest.java | 77 ++++ .../iceberg/input/IcebergInputSourceTest.java | 184 +++++++++ .../druid/iceberg/input/LocalCatalogTest.java | 52 +++ .../hdfs/HdfsInputSourceBuilder.java | 51 +++ .../storage/hdfs/HdfsStorageDruidModule.java | 4 +- .../hdfs/HdfsInputSourceAdapterTest.java | 38 ++ .../data/input/s3/S3InputSourceBuilder.java | 134 +++++++ .../input/s3/S3InputSourceDruidModule.java | 5 +- .../input/s3/S3InputSourceBuilderTest.java | 58 +++ pom.xml | 2 +- .../input/AbstractInputSourceBuilder.java | 123 ++++++ .../input/impl/LocalInputSourceBuilder.java | 42 ++ .../impl/LocalInputSourceAdapterTest.java | 87 +++++ website/.spelling | 13 + 36 files changed, 3049 insertions(+), 3 deletions(-) create mode 100644 docs/development/extensions-contrib/iceberg.md create mode 100644 extensions-contrib/druid-iceberg-extensions/pom.xml create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergAndFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergEqualsFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergIntervalFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergNotFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergOrFilter.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/guice/HiveConf.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergAndFilterTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergEqualsFilterTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergIntervalFilterTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergNotFilterTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergOrFilterTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java create mode 100644 extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java create mode 100644 extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java create mode 100644 extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java create mode 100644 extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java create mode 100644 processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java create mode 100644 processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java create mode 100644 processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java diff --git a/distribution/pom.xml b/distribution/pom.xml index c2aa28f75d3..0d7e16e00e4 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -633,6 +633,8 @@ org.apache.druid.extensions.contrib:aliyun-oss-extensions -c org.apache.druid.extensions.contrib:opentelemetry-emitter + -c + org.apache.druid.extensions:druid-iceberg-extensions diff --git a/docs/development/extensions-contrib/iceberg.md b/docs/development/extensions-contrib/iceberg.md new file mode 100644 index 00000000000..0c02d6bf709 --- /dev/null +++ b/docs/development/extensions-contrib/iceberg.md @@ -0,0 +1,117 @@ +--- +id: iceberg +title: "Iceberg extension" +--- + + + +## Iceberg Ingest extension + +Apache Iceberg is an open table format for huge analytic datasets. [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) lets you ingest data stored in the Iceberg table format into Apache Druid. To use the iceberg extension, add the `druid-iceberg-extensions` to the list of loaded extensions. See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information. + +Iceberg manages most of its metadata in metadata files in the object storage. However, it is still dependent on a metastore to manage a certain amount of metadata. +Iceberg refers to these metastores as catalogs. The Iceberg extension lets you connect to the following Iceberg catalog types: +* Hive metastore catalog +* Local catalog + +Druid does not support AWS Glue and REST based catalogs yet. + +For a given catalog, Iceberg input source reads the table name from the catalog, applies the filters, and extracts all the underlying live data files up to the latest snapshot. +The data files can be in Parquet, ORC, or Avro formats. The data files typically reside in a warehouse location, which can be in HDFS, S3, or the local filesystem. +The `druid-iceberg-extensions` extension relies on the existing input source connectors in Druid to read the data files from the warehouse. Therefore, the Iceberg input source can be considered as an intermediate input source, which provides the file paths for other input source implementations. + +## Hive metastore catalog + +For Druid to seamlessly talk to the Hive metastore, ensure that the Hive configuration files such as `hive-site.xml` and `core-site.xml` are available in the Druid classpath for peon processes. +You can also specify Hive properties under the `catalogProperties` object in the ingestion spec. + +The `druid-iceberg-extensions` extension presently only supports HDFS, S3 and local warehouse directories. + +### Read from HDFS warehouse + +To read from a HDFS warehouse, load the `druid-hdfs-storage` extension. Druid extracts data file paths from the Hive metastore catalog and uses [HDFS input source](../../ingestion/input-sources.md#hdfs-input-source) to ingest these files. +The `warehouseSource` type in the ingestion spec should be `hdfs`. + +For authenticating with Kerberized clusters, include `principal` and `keytab` properties in the `catalogProperties` object: + +```json +"catalogProperties": { + "principal": "krb_principal", + "keytab": "/path/to/keytab" +} +``` +Only Kerberos based authentication is supported as of now. + +### Read from S3 warehouse + +To read from a S3 warehouse, load the `druid-s3-extensions` extension. Druid extracts the data file paths from the Hive metastore catalog and uses `S3InputSource` to ingest these files. +Set the `type` property of the `warehouseSource` object to `s3` in the ingestion spec. If the S3 endpoint for the warehouse is different from the endpoint configured as the deep storage, include the following properties in the `warehouseSource` object to define the S3 endpoint settings: + +```json +"warehouseSource": { + "type": "s3", + "endpointConfig": { + "url": "S3_ENDPOINT_URL", + "signingRegion": "us-east-1" + }, + "clientConfig": { + "protocol": "http", + "disableChunkedEncoding": true, + "enablePathStyleAccess": true, + "forceGlobalBucketAccessEnabled": false + }, + "properties": { + "accessKeyId": { + "type": "default", + "password": "" + } + } +} +``` + +This extension uses the [Hadoop AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/) to connect to S3 and retrieve the metadata and data file paths. +The following properties are required in the `catalogProperties`: + +```json +"catalogProperties": { + "fs.s3a.access.key" : "S3_ACCESS_KEY", + "fs.s3a.secret.key" : "S3_SECRET_KEY", + "fs.s3a.endpoint" : "S3_API_ENDPOINT" +} +``` +Since the Hadoop AWS connector uses the `s3a` filesystem client, specify the warehouse path with the `s3a://` protocol instead of `s3://`. + +## Local catalog + +The local catalog type can be used for catalogs configured on the local filesystem. Set the `icebergCatalog` type to `local`. You can use this catalog for demos or localized tests. It is not recommended for production use cases. +The `warehouseSource` is set to `local` because this catalog only supports reading from a local filesystem. + +## Known limitations + +This section lists the known limitations that apply to the Iceberg extension. + +- This extension does not fully utilize the Iceberg features such as snapshotting or schema evolution. +- The Iceberg input source reads every single live file on the Iceberg table up to the latest snapshot, which makes the table scan less performant. It is recommended to use Iceberg filters on partition columns in the ingestion spec in order to limit the number of data files being retrieved. Since, Druid doesn't store the last ingested iceberg snapshot ID, it cannot identify the files created between that snapshot and the latest snapshot on Iceberg. +- It does not handle Iceberg [schema evolution](https://iceberg.apache.org/docs/latest/evolution/) yet. In cases where an existing Iceberg table column is deleted and recreated with the same name, ingesting this table into Druid may bring the data for this column before it was deleted. +- The Hive catalog has not been tested on Hadoop 2.x.x and is not guaranteed to work with Hadoop 2. \ No newline at end of file diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index fe2d226b912..f5844c1d370 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -794,6 +794,196 @@ The following is an example of a Combining input source spec: ... ``` +## Iceberg input source + +> To use the Iceberg input source, add the `druid-iceberg-extensions` extension. + +You use the Iceberg input source to read data stored in the Iceberg table format. For a given table, the input source scans up to the latest Iceberg snapshot from the configured Hive catalog. Druid ingests the underlying live data files using the existing input source formats. + +The Iceberg input source cannot be independent as it relies on the existing input sources to read from the data files. +For example, if the warehouse associated with an Iceberg catalog is on S3, you must also load the [`druid-s3-extensions`](../development/extensions-core/s3.md) extension. + +The following is a sample spec for a HDFS warehouse source: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "iceberg", + "tableName": "iceberg_table", + "namespace": "iceberg_namespace", + "icebergCatalog": { + "type": "hive", + "warehousePath": "hdfs://warehouse/path", + "catalogUri": "thrift://hive-metastore.x.com:8970", + "catalogProperties": { + "hive.metastore.connect.retries": "1", + "hive.metastore.execute.setugi": "false", + "hive.metastore.kerberos.principal": "KRB_PRINCIPAL", + "hive.metastore.sasl.enabled": "true", + "metastore.catalog.default": "catalog_test", + "hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true" + } + }, + "icebergFilter": { + "type": "interval", + "filterColumn": "event_time", + "intervals": [ + "2023-05-10T19:00:00.000Z/2023-05-10T20:00:00.000Z" + ] + }, + "warehouseSource": { + "type": "hdfs" + } + }, + "inputFormat": { + "type": "parquet" + } + }, + ... +}, +... +``` + +The following is a sample spec for a S3 warehouse source: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "iceberg", + "tableName": "iceberg_table", + "namespace": "iceberg_namespace", + "icebergCatalog": { + "type": "hive", + "warehousePath": "hdfs://warehouse/path", + "catalogUri": "thrift://hive-metastore.x.com:8970", + "catalogProperties": { + "hive.metastore.connect.retries": "1", + "hive.metastore.execute.setugi": "false", + "hive.metastore.kerberos.principal": "KRB_PRINCIPAL", + "hive.metastore.sasl.enabled": "true", + "metastore.catalog.default": "default_catalog", + "fs.s3a.access.key" : "S3_ACCESS_KEY", + "fs.s3a.secret.key" : "S3_SECRET_KEY", + "fs.s3a.endpoint" : "S3_API_ENDPOINT" + } + }, + "icebergFilter": { + "type": "interval", + "filterColumn": "event_time", + "intervals": [ + "2023-05-10T19:00:00.000Z/2023-05-10T20:00:00.000Z" + ] + }, + "warehouseSource": { + "type": "s3", + "endpointConfig": { + "url": "teststore.aws.com", + "signingRegion": "us-west-2a" + }, + "clientConfig": { + "protocol": "http", + "disableChunkedEncoding": true, + "enablePathStyleAccess": true, + "forceGlobalBucketAccessEnabled": false + }, + "properties": { + "accessKeyId": { + "type": "default", + "password": "foo" + }, + "secretAccessKey": { + "type": "default", + "password": "bar" + } + }, + } + }, + "inputFormat": { + "type": "parquet" + } + }, +... +}, +``` + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set the value to `iceberg`.|yes| +|tableName|The Iceberg table name configured in the catalog.|yes| +|namespace|The Iceberg namespace associated with the table|yes| +|icebergFilter|The JSON Object that filters data files within a snapshot|no| +|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes| +|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes| + +###Catalog Object + +The catalog object supports `local` and `hive` catalog types. + +The following table lists the properties of a `local` catalog: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `local`.|yes| +|warehousePath|The location of the warehouse associated with the catalog|yes| +|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no| + +The following table lists the properties of a `hive` catalog: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `hive`.|yes| +|warehousePath|The location of the warehouse associated with the catalog|yes| +|catalogUri|The URI associated with the hive catalog|yes| +|catalogProperties|Map of any additional properties that needs to be attached to the catalog|no| + +### Iceberg filter object + +This input source provides the following filters: `and`, `equals`, `interval`, and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest. + +`equals` Filter: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `equals`.|yes| +|filterColumn|The name of the column from the Iceberg table schema to use for filtering.|yes| +|filterValue|The value to filter on.|yes| + +`interval` Filter: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `interval`.|yes| +|filterColumn|The column name from the iceberg table schema based on which filtering needs to happen|yes| +|intervals|A JSON array containing ISO 8601 interval strings. This defines the time ranges to filter on. The start interval is inclusive and the end interval is exclusive. |yes| + +`and` Filter: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `and`.|yes| +|filters|List of iceberg filters that needs to be AND-ed|yes| + +`or` Filter: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `or`.|yes| +|filters|List of iceberg filters that needs to be OR-ed|yes| + +`not` Filter: + +|Property|Description|Required| +|--------|-----------|---------| +|type|Set this value to `not`.|yes| +|filter|The iceberg filter on which logical NOT is applied|yes| + + + The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: - `range` or `single_dim` partitioning: greater than or equal to 1 diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml new file mode 100644 index 00000000000..baa08a5fbbb --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -0,0 +1,360 @@ + + + + + org.apache.druid.extensions + druid-iceberg-extensions + druid-iceberg-extensions + druid-iceberg-extensions + + + druid + org.apache.druid + 27.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + compile + + + io.netty + netty-buffer + + + commons-cli + commons-cli + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.commons + commons-math3 + + + com.google.guava + guava + + + org.apache.avro + avro + + + net.java.dev.jets3t + jets3t + + + com.sun.jersey + jersey-json + + + com.jcraft + jsch + + + org.mortbay.jetty + jetty + + + com.sun.jersey + jersey-server + + + + commons-beanutils + commons-beanutils-core + + + + + + org.apache.iceberg + iceberg-spark-runtime-3.3_2.12 + 1.0.0 + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + + + + org.apache.hive + hive-metastore + 3.1.3 + runtime + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hbase + hbase-client + + + org.apache.curator + curator-client + + + + + + org.apache.druid + druid-processing + ${project.parent.version} + provided + + + org.slf4j + slf4j-api + + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + + org.apache.hadoop + hadoop-hdfs-client + runtime + + + + org.apache.hadoop + hadoop-mapreduce-client-core + runtime + ${hadoop.compile.version} + + + aopalliance + aopalliance + + + org.apache.commons + commons-compress + + + com.google.guava + guava + + + com.google.inject + guice + + + com.google.inject.extensions + guice-servlet + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + javax.inject + javax + + + io.netty + netty + + + slf4j-log4j12 + org.slf4j + + + org.slf4j + slf4j-api + + + protobuf-java + com.google.protobuf + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + runtime + + + com.amazonaws + aws-java-sdk-bundle + + + + + + com.google.code.findbugs + jsr305 + provided + + + com.google.guava + guava + provided + + + com.google.inject + guice + provided + + + joda-time + joda-time + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + org.slf4j + slf4j-api + provided + + + + junit + junit + test + + + nl.jqno.equalsverifier + equalsverifier + test + + + \ No newline at end of file diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java new file mode 100644 index 00000000000..61ff8f878c4 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/common/IcebergDruidModule.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.common; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.error.DruidException; +import org.apache.druid.iceberg.guice.HiveConf; +import org.apache.druid.iceberg.input.HiveIcebergCatalog; +import org.apache.druid.iceberg.input.IcebergInputSource; +import org.apache.druid.iceberg.input.LocalCatalog; +import org.apache.druid.initialization.DruidModule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.util.Collections; +import java.util.List; + +public class IcebergDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("IcebergDruidModule") + .registerSubtypes( + new NamedType(HiveIcebergCatalog.class, HiveIcebergCatalog.TYPE_KEY), + new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY), + new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY) + + ) + ); + } + + @Override + public void configure(Binder binder) + { + final Configuration conf = new Configuration(); + conf.setClassLoader(getClass().getClassLoader()); + + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + FileSystem.get(conf); + } + catch (Exception ex) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(ex, "Problem during fileSystem class level initialization"); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + binder.bind(Configuration.class).annotatedWith(HiveConf.class).toInstance(conf); + + } + +} + diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergAndFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergAndFilter.java new file mode 100644 index 00000000000..c541372b7f4 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergAndFilter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; + +public class IcebergAndFilter implements IcebergFilter +{ + + private final List filters; + + private static final Logger log = new Logger(IcebergAndFilter.class); + + @JsonCreator + public IcebergAndFilter( + @JsonProperty("filters") List filters + ) + { + Preconditions.checkArgument(filters != null && filters.size() > 0, "filter requires atleast one field"); + this.filters = filters; + } + + @JsonProperty + public List getFilters() + { + return filters; + } + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + List expressions = new ArrayList<>(); + LinkedHashSet flatFilters = flattenAndChildren(filters); + for (IcebergFilter filter : flatFilters) { + expressions.add(filter.getFilterExpression()); + } + Expression finalExpr = Expressions.alwaysTrue(); + for (Expression expr : expressions) { + finalExpr = Expressions.and(finalExpr, expr); + } + return finalExpr; + } + + private static LinkedHashSet flattenAndChildren(final Collection filters) + { + final LinkedHashSet retVal = new LinkedHashSet<>(); + + for (IcebergFilter child : filters) { + if (child instanceof IcebergAndFilter) { + retVal.addAll(flattenAndChildren(((IcebergAndFilter) child).getFilters())); + } else { + retVal.add(child); + } + } + return retVal; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergEqualsFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergEqualsFilter.java new file mode 100644 index 00000000000..d742b223814 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergEqualsFilter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + + +public class IcebergEqualsFilter implements IcebergFilter +{ + + @JsonProperty + private final String filterColumn; + + @JsonProperty + private final String filterValue; + + @JsonCreator + public IcebergEqualsFilter( + @JsonProperty("filterColumn") String filterColumn, + @JsonProperty("filterValue") String filterValue + ) + { + Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the equals filter"); + Preconditions.checkNotNull(filterValue, "You must specify a filter value on the equals filter"); + this.filterColumn = filterColumn; + this.filterValue = filterValue; + } + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + return Expressions.equal( + filterColumn, + filterValue + ); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java new file mode 100644 index 00000000000..10c07cdbe24 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; + +/** + * Interface to manage iceberg expressions which can be used to perform filtering on the iceberg table + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "interval", value = IcebergIntervalFilter.class), + @JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class), + @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class), + @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class), + @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class) +}) +public interface IcebergFilter +{ + TableScan filter(TableScan tableScan); + + Expression getFilterExpression(); +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergIntervalFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergIntervalFilter.java new file mode 100644 index 00000000000..2e53a1d648a --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergIntervalFilter.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Types; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.List; + +public class IcebergIntervalFilter implements IcebergFilter +{ + @JsonProperty + private final String filterColumn; + + @JsonProperty + private final List intervals; + + @JsonCreator + public IcebergIntervalFilter( + @JsonProperty("filterColumn") String filterColumn, + @JsonProperty("intervals") List intervals + ) + { + Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the interval filter"); + Preconditions.checkArgument(intervals != null && intervals.size() > 0, "You must specify intervals on the interval filter"); + this.filterColumn = filterColumn; + this.intervals = intervals; + } + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + List expressions = new ArrayList<>(); + for (Interval filterInterval : intervals) { + // Converts the input timestamp string into iceberg TimestampType because TimestampType supports microsecond precision. + // This is to ensure that there are no precision mismatches when doing the comparison. + Long dateStart = (long) Literal.of(filterInterval.getStart().toString()) + .to(Types.TimestampType.withZone()) + .value(); + Long dateEnd = (long) Literal.of(filterInterval.getEnd().toString()) + .to(Types.TimestampType.withZone()) + .value(); + + expressions.add(Expressions.and( + Expressions.greaterThanOrEqual( + filterColumn, + dateStart + ), + Expressions.lessThan( + filterColumn, + dateEnd + ) + )); + } + Expression finalExpr = Expressions.alwaysFalse(); + for (Expression filterExpr : expressions) { + finalExpr = Expressions.or(finalExpr, filterExpr); + } + return finalExpr; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergNotFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergNotFilter.java new file mode 100644 index 00000000000..6b4fdd50f2a --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergNotFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + + +public class IcebergNotFilter implements IcebergFilter +{ + private final IcebergFilter filter; + + @JsonCreator + public IcebergNotFilter( + @JsonProperty("filter") IcebergFilter filter + ) + { + Preconditions.checkNotNull(filter, "You must specify an iceberg filter"); + this.filter = filter; + } + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + return Expressions.not(filter.getFilterExpression()); + } + + @JsonProperty + public IcebergFilter getFilter() + { + return filter; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergOrFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergOrFilter.java new file mode 100644 index 00000000000..976be8fcfb6 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergOrFilter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; + +public class IcebergOrFilter implements IcebergFilter +{ + private final List filters; + + private static final Logger log = new Logger(IcebergAndFilter.class); + + @JsonCreator + public IcebergOrFilter( + @JsonProperty("filters") List filters + ) + { + Preconditions.checkArgument(filters != null && filters.size() > 0, "filter requires atleast one field"); + this.filters = filters; + } + + @JsonProperty + public List getFilters() + { + return filters; + } + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + List expressions = new ArrayList<>(); + LinkedHashSet flatFilters = flattenOrChildren(filters); + for (IcebergFilter filter : flatFilters) { + expressions.add(filter.getFilterExpression()); + } + Expression finalExpr = Expressions.alwaysFalse(); + for (Expression expr : expressions) { + finalExpr = Expressions.or(finalExpr, expr); + } + return finalExpr; + } + + private static LinkedHashSet flattenOrChildren(final Collection filters) + { + final LinkedHashSet retVal = new LinkedHashSet<>(); + + for (IcebergFilter child : filters) { + if (child instanceof IcebergOrFilter) { + retVal.addAll(flattenOrChildren(((IcebergOrFilter) child).getFilters())); + } else { + retVal.add(child); + } + } + + return retVal; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/guice/HiveConf.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/guice/HiveConf.java new file mode 100644 index 00000000000..6912513d411 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/guice/HiveConf.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Binding annotation for implementations specific to the iceberg extension module. + * This is required as there can be overlap of classes bound by this extension and the hdfs-storage extension. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +public @interface HiveConf +{ +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java new file mode 100644 index 00000000000..a22f886cb31 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.iceberg.guice.HiveConf; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.DynamicConfigProviderUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.hive.HiveCatalog; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; + +/** + * Hive Metastore specific implementation of iceberg catalog. + * Kerberos authentication is performed if the credentials are provided in the catalog properties + */ +public class HiveIcebergCatalog extends IcebergCatalog +{ + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; + public static final String TYPE_KEY = "hive"; + + @JsonProperty + private String warehousePath; + + @JsonProperty + private String catalogUri; + + @JsonProperty + private Map catalogProperties; + + private final Configuration configuration; + + private BaseMetastoreCatalog hiveCatalog; + + private static final Logger log = new Logger(HiveIcebergCatalog.class); + + @JsonCreator + public HiveIcebergCatalog( + @JsonProperty("warehousePath") String warehousePath, + @JsonProperty("catalogUri") String catalogUri, + @JsonProperty("catalogProperties") @Nullable + Map catalogProperties, + @JacksonInject @Json ObjectMapper mapper, + @JacksonInject @HiveConf Configuration configuration + ) + { + this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null"); + this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null"); + this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(catalogProperties, DRUID_DYNAMIC_CONFIG_PROVIDER_KEY, mapper); + this.configuration = configuration; + this.catalogProperties + .forEach(this.configuration::set); + this.hiveCatalog = retrieveCatalog(); + } + + @Override + public BaseMetastoreCatalog retrieveCatalog() + { + if (hiveCatalog == null) { + hiveCatalog = setupCatalog(); + } + return hiveCatalog; + } + + private HiveCatalog setupCatalog() + { + HiveCatalog catalog = new HiveCatalog(); + authenticate(); + catalog.setConf(configuration); + catalogProperties.put("warehouse", warehousePath); + catalogProperties.put("uri", catalogUri); + catalog.initialize("hive", catalogProperties); + return catalog; + } + + private void authenticate() + { + String principal = catalogProperties.getOrDefault("principal", null); + String keytab = catalogProperties.getOrDefault("keytab", null); + if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) { + UserGroupInformation.setConfiguration(configuration); + if (UserGroupInformation.isSecurityEnabled()) { + try { + if (UserGroupInformation.getCurrentUser().hasKerberosCredentials() == false + || !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) { + log.info("Hive trying to authenticate user [%s] with keytab [%s]..", principal, keytab); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + } + } + catch (IOException e) { + throw new ISE(e, "Failed to authenticate user principal [%s] with keytab [%s]", principal, keytab); + } + } + } + } + + public String getWarehousePath() + { + return warehousePath; + } + + public String getCatalogUri() + { + return catalogUri; + } + + public Map getCatalogProperties() + { + return catalogProperties; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java new file mode 100644 index 00000000000..e62ab830a6a --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; + +import java.util.ArrayList; +import java.util.List; + +/* + * Druid wrapper for an iceberg catalog. + * The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. + * This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. + */ + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY) +public abstract class IcebergCatalog +{ + private static final Logger log = new Logger(IcebergCatalog.class); + + public abstract BaseMetastoreCatalog retrieveCatalog(); + + /** + * Extract the iceberg data files upto the latest snapshot associated with the table + * + * @param tableNamespace The catalog namespace under which the table is defined + * @param tableName The iceberg table name + * @return a list of data file paths + */ + public List extractSnapshotDataFiles( + String tableNamespace, + String tableName, + IcebergFilter icebergFilter + ) + { + Catalog catalog = retrieveCatalog(); + Namespace namespace = Namespace.of(tableNamespace); + String tableIdentifier = tableNamespace + "." + tableName; + + List dataFilePaths = new ArrayList<>(); + + ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() + .filter(tableId -> tableId.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + " Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog", + tableIdentifier + )); + + long start = System.currentTimeMillis(); + TableScan tableScan = catalog.loadTable(icebergTableIdentifier).newScan(); + + if (icebergFilter != null) { + tableScan = icebergFilter.filter(tableScan); + } + + CloseableIterable tasks = tableScan.planFiles(); + CloseableIterable.transform(tasks, FileScanTask::file) + .forEach(dataFile -> dataFilePaths.add(dataFile.path().toString())); + + long duration = System.currentTimeMillis() - start; + log.info("Data file scan and fetch took [%d ms] time for [%d] paths", duration, dataFilePaths.size()); + } + catch (Exception e) { + throw new RE(e, "Failed to load iceberg table with identifier [%s]", tableIdentifier); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxClassloader); + } + return dataFilePaths; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java new file mode 100644 index 00000000000..81899703dcf --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.iceberg.filter.IcebergFilter; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; + +/** + * Inputsource to ingest data managed by the Iceberg table format. + * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table. + * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined. + */ +public class IcebergInputSource implements SplittableInputSource> +{ + public static final String TYPE_KEY = "iceberg"; + + @JsonProperty + private final String tableName; + + @JsonProperty + private final String namespace; + + @JsonProperty + private IcebergCatalog icebergCatalog; + + @JsonProperty + private IcebergFilter icebergFilter; + + @JsonProperty + private AbstractInputSourceBuilder warehouseSource; + + private boolean isLoaded = false; + + private SplittableInputSource delegateInputSource; + + @JsonCreator + public IcebergInputSource( + @JsonProperty("tableName") String tableName, + @JsonProperty("namespace") String namespace, + @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, + @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, + @JsonProperty("warehouseSource") AbstractInputSourceBuilder warehouseSource + ) + { + this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); + this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null"); + this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); + this.icebergFilter = icebergFilter; + this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null"); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + if (!isLoaded) { + retrieveIcebergDatafiles(); + } + return getDelegateInputSource().reader(inputRowSchema, inputFormat, temporaryDirectory); + } + + @Override + public Stream>> createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) throws IOException + { + if (!isLoaded) { + retrieveIcebergDatafiles(); + } + return getDelegateInputSource().createSplits(inputFormat, splitHintSpec); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException + { + if (!isLoaded) { + retrieveIcebergDatafiles(); + } + return getDelegateInputSource().estimateNumSplits(inputFormat, splitHintSpec); + } + + @Override + public InputSource withSplit(InputSplit> inputSplit) + { + return getDelegateInputSource().withSplit(inputSplit); + } + + @Override + public SplitHintSpec getSplitHintSpecOrDefault(@Nullable SplitHintSpec splitHintSpec) + { + return getDelegateInputSource().getSplitHintSpecOrDefault(splitHintSpec); + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public String getNamespace() + { + return namespace; + } + + @JsonProperty + public IcebergCatalog getIcebergCatalog() + { + return icebergCatalog; + } + + @JsonProperty + public IcebergFilter getIcebergFilter() + { + return icebergFilter; + } + + public SplittableInputSource getDelegateInputSource() + { + return delegateInputSource; + } + + protected void retrieveIcebergDatafiles() + { + List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( + getNamespace(), + getTableName(), + getIcebergFilter() + ); + delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles); + isLoaded = true; + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java new file mode 100644 index 00000000000..d4961bb0967 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/LocalCatalog.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.hadoop.HadoopCatalog; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Objects; + +/** + * Iceberg catalog implementation that handles file based catalogs created in the local filesystem. + */ +public class LocalCatalog extends IcebergCatalog +{ + public static final String TYPE_KEY = "local"; + + @JsonProperty + private final String warehousePath; + + @JsonProperty + private final Map catalogProperties; + + private BaseMetastoreCatalog catalog; + + @JsonCreator + public LocalCatalog( + @JsonProperty("warehousePath") String warehousePath, + @JsonProperty("catalogProperties") @Nullable + Map catalogProperties + ) + { + Preconditions.checkNotNull(warehousePath, "warehousePath is null"); + this.warehousePath = warehousePath; + this.catalogProperties = catalogProperties; + this.catalog = retrieveCatalog(); + + } + + @JsonProperty + public String getWarehousePath() + { + return warehousePath; + } + + @JsonProperty + public Map getCatalogProperties() + { + return catalogProperties; + } + + @Override + public BaseMetastoreCatalog retrieveCatalog() + { + if (catalog == null) { + catalog = setupCatalog(); + } + return catalog; + } + + private HadoopCatalog setupCatalog() + { + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + hadoopCatalog.setConf(new Configuration()); + catalogProperties.put("warehouse", warehousePath); + hadoopCatalog.initialize("hadoop", catalogProperties); + return hadoopCatalog; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LocalCatalog that = (LocalCatalog) o; + return warehousePath.equals(that.warehousePath) + && Objects.equals(catalogProperties, that.catalogProperties); + } + + @Override + public int hashCode() + { + return Objects.hash(warehousePath, catalogProperties); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/druid-iceberg-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 00000000000..b0a3afdc551 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.iceberg.common.IcebergDruidModule \ No newline at end of file diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergAndFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergAndFilterTest.java new file mode 100644 index 00000000000..9d652b93b36 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergAndFilterTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.druid.java.util.common.Intervals; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class IcebergAndFilterTest +{ + private final String INTERVAL_COLUMN = "eventTime"; + private final String COLUMN1 = "column1"; + private final String COLUMN2 = "column2"; + + private final Expression equalExpression1 = Expressions.equal(COLUMN1, "value1"); + private final Expression equalExpression2 = Expressions.equal(COLUMN2, "value2"); + private final Expression intervalExpression = Expressions.and( + Expressions.greaterThanOrEqual( + INTERVAL_COLUMN, + Literal.of("2022-01-01T00:00:00.000Z") + .to(Types.TimestampType.withZone()) + .value() + ), + Expressions.lessThan( + INTERVAL_COLUMN, + Literal.of("2022-01-02T00:00:00.000Z") + .to(Types.TimestampType.withZone()) + .value() + ) + ); + + @Test + public void testFilter() + { + IcebergAndFilter andFilter = new IcebergAndFilter(Arrays.asList( + new IcebergEqualsFilter(COLUMN1, "value1"), + new IcebergEqualsFilter(COLUMN2, "value2") + )); + Expression expectedExpression = Expressions.and(equalExpression1, equalExpression2); + Assert.assertEquals(expectedExpression.toString(), andFilter.getFilterExpression().toString()); + } + + @Test + public void testEmptyFilter() + { + Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(null)); + Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(Collections.emptyList())); + } + + @Test + public void testNestedFilter() + { + IcebergAndFilter andFilter = new IcebergAndFilter( + Arrays.asList( + new IcebergAndFilter( + Arrays.asList( + new IcebergEqualsFilter(COLUMN1, "value1"), + new IcebergEqualsFilter(COLUMN2, "value2") + )), + new IcebergIntervalFilter( + INTERVAL_COLUMN, + Collections.singletonList(Intervals.of( + "2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z")) + ) + )); + Expression expectedExpression = Expressions.and( + Expressions.and(equalExpression1, equalExpression2), + intervalExpression + ); + Assert.assertEquals(expectedExpression.toString(), andFilter.getFilterExpression().toString()); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergEqualsFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergEqualsFilterTest.java new file mode 100644 index 00000000000..c021a16e865 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergEqualsFilterTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.junit.Assert; +import org.junit.Test; + +public class IcebergEqualsFilterTest +{ + @Test + public void testFilter() + { + IcebergEqualsFilter testFilter = new IcebergEqualsFilter("column1", "value1"); + Expression expectedExpression = Expressions.equal("column1", "value1"); + Assert.assertEquals(expectedExpression.toString(), testFilter.getFilterExpression().toString()); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergIntervalFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergIntervalFilterTest.java new file mode 100644 index 00000000000..0efb9dd739b --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergIntervalFilterTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.druid.java.util.common.Intervals; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Types; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class IcebergIntervalFilterTest +{ + @Test + public void testFilter() + { + String intervalColumn = "eventTime"; + List intervals = Arrays.asList( + Intervals.of("2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z"), + Intervals.of("2023-01-01T00:00:00.000Z/2023-01-01T00:00:00.000Z") + ); + + IcebergIntervalFilter intervalFilter = new IcebergIntervalFilter(intervalColumn, intervals); + Expression expectedExpression = Expressions.or( + Expressions.and( + Expressions.greaterThanOrEqual( + intervalColumn, + Literal.of(intervals.get(0).getStart().toString()) + .to(Types.TimestampType.withZone()) + .value() + ), + Expressions.lessThan( + intervalColumn, + Literal.of(intervals.get(0).getEnd().toString()) + .to(Types.TimestampType.withZone()) + .value() + ) + ), + Expressions.and( + Expressions.greaterThanOrEqual( + intervalColumn, + Literal.of(intervals.get(1).getStart().toString()) + .to(Types.TimestampType.withZone()) + .value() + ), + Expressions.lessThan( + intervalColumn, + Literal.of(intervals.get(1).getEnd().toString()) + .to(Types.TimestampType.withZone()) + .value() + ) + ) + ); + Assert.assertEquals(expectedExpression.toString(), intervalFilter.getFilterExpression().toString()); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergNotFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergNotFilterTest.java new file mode 100644 index 00000000000..0d683f512a1 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergNotFilterTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class IcebergNotFilterTest +{ + @Test + public void testFilter() + { + IcebergNotFilter testFilter = new IcebergNotFilter(new IcebergEqualsFilter("column1", "value1")); + Expression expectedExpression = Expressions.not(Expressions.equal("column1", "value1")); + Assert.assertEquals(expectedExpression.toString(), testFilter.getFilterExpression().toString()); + } + + @Test + public void testNestedFilters() + { + String column1 = "column1"; + String column2 = "column2"; + + IcebergNotFilter filterNotAnd = new IcebergNotFilter( + new IcebergAndFilter(Arrays.asList( + new IcebergEqualsFilter( + column1, + "value1" + ), + new IcebergEqualsFilter( + column2, + "value2" + ) + )) + ); + Expression expressionNotAnd = Expressions.not(Expressions.and( + Expressions.equal(column1, "value1"), + Expressions.equal(column2, "value2") + )); + + IcebergNotFilter filterNotOr = new IcebergNotFilter( + new IcebergOrFilter(Arrays.asList( + new IcebergEqualsFilter( + column1, + "value1" + ), + new IcebergEqualsFilter( + column2, + "value2" + ) + )) + ); + Expression expressionNotOr = Expressions.not(Expressions.or( + Expressions.equal(column1, "value1"), + Expressions.equal(column2, "value2") + )); + + Assert.assertEquals(expressionNotAnd.toString(), filterNotAnd.getFilterExpression().toString()); + Assert.assertEquals(expressionNotOr.toString(), filterNotOr.getFilterExpression().toString()); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergOrFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergOrFilterTest.java new file mode 100644 index 00000000000..417cdc8d233 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergOrFilterTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.druid.java.util.common.Intervals; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class IcebergOrFilterTest +{ + private final String INTERVAL_COLUMN = "eventTime"; + private final String COLUMN1 = "column1"; + private final String COLUMN2 = "column2"; + + private final Expression equalExpression1 = Expressions.equal(COLUMN1, "value1"); + private final Expression equalExpression2 = Expressions.equal(COLUMN2, "value2"); + private final Expression intervalExpression = Expressions.and( + Expressions.greaterThanOrEqual( + INTERVAL_COLUMN, + Literal.of("2022-01-01T00:00:00.000Z") + .to(Types.TimestampType.withZone()) + .value() + ), + Expressions.lessThan( + INTERVAL_COLUMN, + Literal.of("2022-01-02T00:00:00.000Z") + .to(Types.TimestampType.withZone()) + .value() + ) + ); + + @Test + public void testFilter() + { + IcebergOrFilter orFilter = new IcebergOrFilter(Arrays.asList( + new IcebergEqualsFilter(COLUMN1, "value1"), + new IcebergEqualsFilter(COLUMN2, "value2") + )); + Expression expectedExpression = Expressions.or(equalExpression1, equalExpression2); + Assert.assertEquals(expectedExpression.toString(), orFilter.getFilterExpression().toString()); + } + + @Test + public void testEmptyFilter() + { + Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(null)); + Assert.assertThrows(IllegalArgumentException.class, () -> new IcebergAndFilter(Collections.emptyList())); + } + + @Test + public void testNestedFilters() + { + IcebergOrFilter filterOrOr = new IcebergOrFilter( + Arrays.asList( + new IcebergOrFilter( + Arrays.asList( + new IcebergEqualsFilter(COLUMN1, "value1"), + new IcebergEqualsFilter(COLUMN2, "value2") + )), + new IcebergIntervalFilter( + INTERVAL_COLUMN, + Collections.singletonList(Intervals.of( + "2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z")) + ) + )); + Expression expectedExpressionOrOr = Expressions.or( + Expressions.or(equalExpression1, equalExpression2), + intervalExpression + ); + + IcebergOrFilter filterOrAnd = new IcebergOrFilter( + Arrays.asList( + new IcebergAndFilter( + Arrays.asList( + new IcebergEqualsFilter(COLUMN1, "value1"), + new IcebergEqualsFilter(COLUMN2, "value2") + )), + new IcebergIntervalFilter( + INTERVAL_COLUMN, + Collections.singletonList(Intervals.of( + "2022-01-01T00:00:00.000Z/2022-01-02T00:00:00.000Z")) + ) + )); + Expression expectedExpressionOrAnd = Expressions.or( + Expressions.and(equalExpression1, equalExpression2), + intervalExpression + ); + + Assert.assertEquals(expectedExpressionOrOr.toString(), filterOrOr.getFilterExpression().toString()); + Assert.assertEquals(expectedExpressionOrAnd.toString(), filterOrAnd.getFilterExpression().toString()); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java new file mode 100644 index 00000000000..def778ee906 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/HiveIcebergCatalogTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; + +public class HiveIcebergCatalogTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void testCatalogCreate() + { + final File warehouseDir = FileUtils.createTempDir(); + HiveIcebergCatalog hiveCatalog = new HiveIcebergCatalog( + warehouseDir.getPath(), + "hdfs://testuri", + new HashMap<>(), + mapper, + new Configuration() + ); + HiveIcebergCatalog hiveCatalogNullProps = new HiveIcebergCatalog( + warehouseDir.getPath(), + "hdfs://testuri", + null, + mapper, + new Configuration() + ); + Assert.assertEquals("hive", hiveCatalog.retrieveCatalog().name()); + Assert.assertEquals(2, hiveCatalogNullProps.getCatalogProperties().size()); + } + + @Test + public void testAuthenticate() + { + UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"})); + final File warehouseDir = FileUtils.createTempDir(); + HashMap catalogMap = new HashMap<>(); + catalogMap.put("principal", "test"); + catalogMap.put("keytab", "test"); + HiveIcebergCatalog hiveCatalog = new HiveIcebergCatalog( + warehouseDir.getPath(), + "hdfs://testuri", + catalogMap, + mapper, + new Configuration() + ); + Assert.assertEquals("hdfs://testuri", hiveCatalog.getCatalogUri()); + + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java new file mode 100644 index 00000000000..9c3de4922b1 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.LocalInputSourceBuilder; +import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class IcebergInputSourceTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + IcebergCatalog testCatalog; + + Schema tableSchema = new Schema( + Types.NestedField.required(1, "id", Types.StringType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()) + ); + Map tableData = ImmutableMap.of("id", "123988", "name", "Foo"); + + private static final String NAMESPACE = "default"; + private static final String TABLENAME = "foosTable"; + + @Test + public void testInputSource() throws IOException + { + final File warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + + createAndLoadTable(tableIdentifier); + + IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceBuilder() + ); + Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); + List localInputSourceList = splits.map(inputSource::withSplit) + .map(inpSource -> (LocalInputSource) inpSource) + .map(LocalInputSource::getFiles) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Assert.assertEquals(1, inputSource.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null))); + Assert.assertEquals(1, localInputSourceList.size()); + CloseableIterable datafileReader = Parquet.read(Files.localInput(localInputSourceList.get(0))) + .project(tableSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader( + tableSchema, + fileSchema + )) + .build(); + + + for (Record record : datafileReader) { + Assert.assertEquals(tableData.get("id"), record.get(0)); + Assert.assertEquals(tableData.get("name"), record.get(1)); + } + dropTableFromCatalog(tableIdentifier); + } + + @Test + public void testInputSourceWithFilter() throws IOException + { + final File warehouseDir = FileUtils.createTempDir(); + testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + + createAndLoadTable(tableIdentifier); + + IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + new IcebergEqualsFilter("id", "0000"), + testCatalog, + new LocalInputSourceBuilder() + ); + Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); + List localInputSourceList = splits.map(inputSource::withSplit) + .map(inpSource -> (LocalInputSource) inpSource) + .map(LocalInputSource::getFiles) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Assert.assertEquals(0, localInputSourceList.size()); + dropTableFromCatalog(tableIdentifier); + } + + private void createAndLoadTable(TableIdentifier tableIdentifier) throws IOException + { + //Setup iceberg table and schema + Table icebergTableFromSchema = testCatalog.retrieveCatalog().createTable(tableIdentifier, tableSchema); + + //Generate an iceberg record and write it to a file + GenericRecord record = GenericRecord.create(tableSchema); + ImmutableList.Builder builder = ImmutableList.builder(); + + builder.add(record.copy(tableData)); + String filepath = icebergTableFromSchema.location() + "/" + UUID.randomUUID(); + OutputFile file = icebergTableFromSchema.io().newOutputFile(filepath); + DataWriter dataWriter = + Parquet.writeData(file) + .schema(tableSchema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + + try { + for (GenericRecord genRecord : builder.build()) { + dataWriter.write(genRecord); + } + } + finally { + dataWriter.close(); + } + DataFile dataFile = dataWriter.toDataFile(); + + //Add the data file to the iceberg table + icebergTableFromSchema.newAppend().appendFile(dataFile).commit(); + + } + + private void dropTableFromCatalog(TableIdentifier tableIdentifier) + { + testCatalog.retrieveCatalog().dropTable(tableIdentifier); + } + +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java new file mode 100644 index 00000000000..b0a7b5528a1 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/LocalCatalogTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.core.JsonProcessingException; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; + +public class LocalCatalogTest +{ + @Test + public void testCatalogSerDe() throws JsonProcessingException + { + final File warehouseDir = FileUtils.createTempDir(); + DefaultObjectMapper mapper = new DefaultObjectMapper(); + LocalCatalog before = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); + LocalCatalog after = mapper.readValue( + mapper.writeValueAsString(before), LocalCatalog.class); + Assert.assertEquals(before, after); + Assert.assertEquals("hadoop", before.retrieveCatalog().name()); + Assert.assertEquals("hadoop", after.retrieveCatalog().name()); + } + + @Test + public void testEqualsContract() + { + EqualsVerifier.forClass(LocalCatalog.class).withNonnullFields("warehousePath").withIgnoredFields("catalog").usingGetClass().verify(); + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java new file mode 100644 index 00000000000..94b67ca93dc --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.guice.Hdfs; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder +{ + private final Configuration configuration; + private final HdfsInputSourceConfig inputSourceConfig; + + @JsonCreator + public HdfsInputSourceBuilder( + @JacksonInject @Hdfs Configuration configuration, + @JacksonInject HdfsInputSourceConfig inputSourceConfig + ) + { + this.configuration = configuration; + this.inputSourceConfig = inputSourceConfig; + } + + @Override + public SplittableInputSource generateInputSource(List inputFilePaths) + { + return new HdfsInputSource(inputFilePaths, configuration, inputSourceConfig); + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 6a5f96b0d58..0923450c5fb 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -34,6 +34,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; import org.apache.druid.inputsource.hdfs.HdfsInputSource; +import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; @@ -65,7 +66,8 @@ public class HdfsStorageDruidModule implements DruidModule return Collections.singletonList( new SimpleModule().registerSubtypes( new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME), - new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME) + new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME), + new NamedType(HdfsInputSourceBuilder.class, HdfsStorageDruidModule.SCHEME) ) ); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java new file mode 100644 index 00000000000..855a67ac722 --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class HdfsInputSourceAdapterTest +{ + @Test + public void testAdapterGet() + { + Configuration conf = new Configuration(); + HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null); + HdfsInputSourceBuilder hdfsInputSourceAdapter = new HdfsInputSourceBuilder(conf, inputSourceConfig); + Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet", "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource); + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java new file mode 100644 index 00000000000..1755cb041c5 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.aws.AWSClientConfig; +import org.apache.druid.common.aws.AWSEndpointConfig; +import org.apache.druid.common.aws.AWSProxyConfig; +import org.apache.druid.data.input.AbstractInputSourceBuilder; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.storage.s3.S3InputDataConfig; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; + +import javax.annotation.Nullable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.stream.Collectors; + +public class S3InputSourceBuilder extends AbstractInputSourceBuilder +{ + private final ServerSideEncryptingAmazonS3 s3Client; + private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder; + private final S3InputSourceConfig s3InputSourceConfig; + private final S3InputDataConfig inputDataConfig; + private final AWSCredentialsProvider awsCredentialsProvider; + private final AWSProxyConfig awsProxyConfig; + private final AWSClientConfig awsClientConfig; + private final AWSEndpointConfig awsEndpointConfig; + + @JsonCreator + public S3InputSourceBuilder( + @JacksonInject ServerSideEncryptingAmazonS3 s3Client, + @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder, + @JacksonInject S3InputDataConfig inputDataConfig, + @JacksonInject AWSCredentialsProvider awsCredentialsProvider, + @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig, + @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig, + @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig, + @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig + ) + { + this.s3Client = s3Client; + this.s3ClientBuilder = s3ClientBuilder; + this.inputDataConfig = inputDataConfig; + this.awsCredentialsProvider = awsCredentialsProvider; + this.s3InputSourceConfig = s3InputSourceConfig; + this.awsProxyConfig = awsProxyConfig; + this.awsEndpointConfig = awsEndpointConfig; + this.awsClientConfig = awsClientConfig; + } + + @Override + public SplittableInputSource generateInputSource(List inputFilePaths) + { + return new S3InputSource( + s3Client, + s3ClientBuilder, + inputDataConfig, + awsCredentialsProvider, + inputFilePaths.stream().map(chosenPath -> { + try { + return new URI(StringUtils.replace(chosenPath, "s3a://", "s3://")); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }).collect( + Collectors.toList()), + null, + null, + null, + s3InputSourceConfig, + awsProxyConfig, + awsEndpointConfig, + awsClientConfig + ); + } + + @Nullable + @JsonProperty("properties") + @JsonInclude(JsonInclude.Include.NON_NULL) + public S3InputSourceConfig getS3InputSourceConfig() + { + return s3InputSourceConfig; + } + + @Nullable + @JsonProperty("proxyConfig") + @JsonInclude(JsonInclude.Include.NON_NULL) + public AWSProxyConfig getAwsProxyConfig() + { + return awsProxyConfig; + } + + @Nullable + @JsonProperty("clientConfig") + @JsonInclude(JsonInclude.Include.NON_NULL) + public AWSClientConfig getAwsClientConfig() + { + return awsClientConfig; + } + + @Nullable + @JsonProperty("endpointConfig") + @JsonInclude(JsonInclude.Include.NON_NULL) + public AWSEndpointConfig getAwsEndpointConfig() + { + return awsEndpointConfig; + } + +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java index 241c7c16409..b8227d19456 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java @@ -38,7 +38,10 @@ public class S3InputSourceDruidModule implements DruidModule public List getJacksonModules() { return ImmutableList.of( - new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME)) + new SimpleModule().registerSubtypes( + new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME), + new NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME) + ) ); } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java new file mode 100644 index 00000000000..bb42975cd61 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import org.apache.druid.storage.s3.S3InputDataConfig; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class S3InputSourceBuilderTest +{ + @Test + public void testAdapterGet() + { + ServerSideEncryptingAmazonS3.Builder serverSides3Builder = + EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class); + ServerSideEncryptingAmazonS3 service = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); + S3InputDataConfig dataConfig = EasyMock.createMock(S3InputDataConfig.class); + List fileUris = Arrays.asList( + "s3://foo/bar/file.csv", + "s3://bar/foo/file2.csv", + "s3://bar/foo/file3.txt" + ); + + S3InputSourceBuilder s3Builder = new S3InputSourceBuilder( + service, + serverSides3Builder, + dataConfig, + null, + null, + null, + null, + null + ); + Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof S3InputSource); + } +} diff --git a/pom.xml b/pom.xml index 993e9d3d413..5ab6b368a64 100644 --- a/pom.xml +++ b/pom.xml @@ -221,6 +221,7 @@ extensions-contrib/prometheus-emitter extensions-contrib/opentelemetry-emitter extensions-contrib/kubernetes-overlord-extensions + extensions-contrib/druid-iceberg-extensions distribution @@ -259,7 +260,6 @@ - diff --git a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java new file mode 100644 index 00000000000..a0af825c1a9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.impl.LocalInputSourceBuilder; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * A wrapper on top of {@link SplittableInputSource} that handles input source creation. + * For composing input sources such as IcebergInputSource, the delegate input source instantiation might fail upon deserialization since the input file paths + * are not available yet and this might fail the input source precondition checks. + * This adapter helps create the delegate input source once the input file paths are fully determined. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = LocalInputSourceBuilder.class) +}) +public abstract class AbstractInputSourceBuilder +{ + public abstract SplittableInputSource generateInputSource(List inputFilePaths); + + public SplittableInputSource setupInputSource(List inputFilePaths) + { + if (inputFilePaths.isEmpty()) { + return new EmptyInputSource(); + } else { + return generateInputSource(inputFilePaths); + } + } + + /** + * This input source is used in place of a delegate input source if there are no input file paths. + * Certain input sources cannot be instantiated with an empty input file list and so composing input sources such as IcebergInputSource + * may use this input source as delegate in such cases. + */ + private static class EmptyInputSource implements SplittableInputSource + { + @Override + public boolean needsFormat() + { + return false; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + return new InputSourceReader() + { + @Override + public CloseableIterator read(InputStats inputStats) + { + return CloseableIterators.wrap(Collections.emptyIterator(), () -> { + }); + } + + @Override + public CloseableIterator sample() + { + return CloseableIterators.wrap(Collections.emptyIterator(), () -> { + }); + } + }; + } + + @Override + public Stream createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + return Stream.empty(); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return 0; + } + + @Override + public InputSource withSplit(InputSplit split) + { + return null; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java new file mode 100644 index 00000000000..6e2f44b5674 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.AbstractInputSourceBuilder; + +import java.io.File; +import java.util.List; +import java.util.stream.Collectors; + +public class LocalInputSourceBuilder extends AbstractInputSourceBuilder +{ + public static final String TYPE_KEY = "local"; + + @Override + public LocalInputSource generateInputSource(List inputFilePaths) + { + return new LocalInputSource( + null, + null, + inputFilePaths.stream().map(chosenPath -> new File(chosenPath)).collect( + Collectors.toList()) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java new file mode 100644 index 00000000000..78a33c05b46 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class LocalInputSourceAdapterTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testAdapterGet() + { + LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); + Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList( + "foo.parquet", + "bar.parquet" + )) instanceof LocalInputSource); + } + + @Test + public void testAdapterSetup() + { + LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); + InputSource delegateInputSource = localInputSourceAdapter.setupInputSource(Arrays.asList( + "foo.parquet", + "bar.parquet" + )); + Assert.assertTrue(delegateInputSource instanceof LocalInputSource); + } + + @Test + public void testEmptyInputSource() throws IOException + { + InputFormat mockFormat = EasyMock.createMock(InputFormat.class); + SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class); + LocalInputSourceBuilder localInputSourceAdapter = new LocalInputSourceBuilder(); + SplittableInputSource emptyInputSource = + (SplittableInputSource) localInputSourceAdapter.setupInputSource(Collections.emptyList()); + List> splitList = emptyInputSource + .createSplits(mockFormat, mockSplitHint) + .collect(Collectors.toList()); + Assert.assertTrue(splitList.isEmpty()); + Assert.assertFalse(emptyInputSource.isSplittable()); + Assert.assertFalse(emptyInputSource.needsFormat()); + Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class))); + Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, mockSplitHint)); + Assert.assertFalse(emptyInputSource.reader( + EasyMock.createMock(InputRowSchema.class), + mockFormat, + temporaryFolder.newFolder() + ).read().hasNext()); + } +} diff --git a/website/.spelling b/website/.spelling index 7674ad0e6de..984a658bca1 100644 --- a/website/.spelling +++ b/website/.spelling @@ -109,6 +109,8 @@ html HyperLogLog IAM IANA +IcebergFilter +IcebergInputSource IETF IP IPv4 @@ -312,6 +314,8 @@ featureSpec findColumnsFromHeader filenames filesystem +filterColumn +filterValue firefox firehose firehoses @@ -375,6 +379,7 @@ max_map_count memcached mergeable metadata +metastores millis microbatch microbatches @@ -483,6 +488,7 @@ skipHeaderRows Smoosh smoosh smooshed +snapshotting splittable ssl sslmode @@ -1226,6 +1232,8 @@ pom.xml 0.7.x 0.7.x. TimeAndDims +catalogProperties +catalogUri column2 column_1 column_n @@ -1234,6 +1242,8 @@ ctrl descriptorString headerFormat headerLabelPrefix +icebergFilter +icebergCatalog jsonLowercase kafka KafkaStringHeaderFormat @@ -1306,6 +1316,7 @@ segmentOutputPath segmentTable shardSpec single_dim +tableName targetPartitionSize targetRowsPerSegment useCombiner @@ -1398,6 +1409,8 @@ BUILD_SEGMENTS DETERMINE_PARTITIONS forceTimeChunkLock taskLockTimeout +warehouseSource +warehousePath index.md DOUBLE_ARRAY DOY