mirror of https://github.com/apache/druid.git
Adds support to the iceberg input source to read from Iceberg REST Catalogs. Co-authored-by: Atul Mohan <atulmohan.mec@gmail.com>
This commit is contained in:
parent
b7cc0bb343
commit
0ae9988796
|
@ -31,10 +31,11 @@ Apache Iceberg is an open table format for huge analytic datasets. [IcebergInput
|
|||
|
||||
Iceberg manages most of its metadata in metadata files in the object storage. However, it is still dependent on a metastore to manage a certain amount of metadata.
|
||||
Iceberg refers to these metastores as catalogs. The Iceberg extension lets you connect to the following Iceberg catalog types:
|
||||
* REST-based catalog
|
||||
* Hive metastore catalog
|
||||
* Local catalog
|
||||
|
||||
Druid does not support AWS Glue and REST based catalogs yet.
|
||||
Druid does not support AWS Glue catalog yet.
|
||||
|
||||
For a given catalog, Iceberg input source reads the table name from the catalog, applies the filters, and extracts all the underlying live data files up to the latest snapshot.
|
||||
The data files can be in Parquet, ORC, or Avro formats. The data files typically reside in a warehouse location, which can be in HDFS, S3, or the local filesystem.
|
||||
|
@ -110,6 +111,11 @@ Since the Hadoop AWS connector uses the `s3a` filesystem client, specify the war
|
|||
The local catalog type can be used for catalogs configured on the local filesystem. Set the `icebergCatalog` type to `local`. You can use this catalog for demos or localized tests. It is not recommended for production use cases.
|
||||
The `warehouseSource` is set to `local` because this catalog only supports reading from a local filesystem.
|
||||
|
||||
## REST catalog
|
||||
|
||||
To connect to an Iceberg REST Catalog server, configure the `icebergCatalog` type as `rest`. The Iceberg REST Open API spec gives catalogs greater control over the implementation and in most cases, the `warehousePath` does not have to be provided by the client.
|
||||
Security credentials may be provided in the `catalogProperties` object.
|
||||
|
||||
## Downloading Iceberg extension
|
||||
|
||||
To download `druid-iceberg-extensions`, run the following command after replacing `<VERSION>` with the desired
|
||||
|
|
|
@ -1063,7 +1063,7 @@ The following is a sample spec for a S3 warehouse source:
|
|||
|
||||
### Catalog Object
|
||||
|
||||
The catalog object supports `local` and `hive` catalog types.
|
||||
The catalog object supports `rest`, `hive` and `local` catalog types.
|
||||
|
||||
The following table lists the properties of a `local` catalog:
|
||||
|
||||
|
@ -1084,9 +1084,18 @@ The following table lists the properties of a `hive` catalog:
|
|||
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|
||||
|caseSensitive|Toggle case sensitivity for column names during Iceberg table reads.|true|no|
|
||||
|
||||
The following table lists the properties of a `rest` catalog:
|
||||
|
||||
|Property|Description|Default|Required|
|
||||
|--------|-----------|-------|---------|
|
||||
|type|Set this value to `rest`.|None|yes|
|
||||
|catalogUri|The URI associated with the catalog's HTTP endpoint.|None|yes|
|
||||
|catalogProperties|Map of any additional properties that needs to be attached to the catalog.|None|no|
|
||||
|
||||
### Iceberg filter object
|
||||
|
||||
This input source provides the following filters: `and`, `equals`, `interval`, and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest.
|
||||
It is strongly recommended to apply filtering only on Iceberg partition columns. When filtering on non-partition columns, Iceberg filters may return rows that do not fully match the expression. To address this, it may help to define an additional filter in the [`transformSpec`](./ingestion-spec.md#transformspec) to remove residual rows.
|
||||
|
||||
`equals` Filter:
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<properties>
|
||||
<iceberg.core.version>1.4.1</iceberg.core.version>
|
||||
<iceberg.core.version>1.6.1</iceberg.core.version>
|
||||
<hive.version>3.1.3</hive.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
|
@ -258,10 +258,6 @@
|
|||
<groupId>io.airlift</groupId>
|
||||
<artifactId>aircompressor</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.httpcomponents.client5</groupId>
|
||||
<artifactId>httpclient5</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.iceberg.guice.HiveConf;
|
|||
import org.apache.druid.iceberg.input.HiveIcebergCatalog;
|
||||
import org.apache.druid.iceberg.input.IcebergInputSource;
|
||||
import org.apache.druid.iceberg.input.LocalCatalog;
|
||||
import org.apache.druid.iceberg.input.RestIcebergCatalog;
|
||||
import org.apache.druid.initialization.DruidModule;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -45,6 +46,7 @@ public class IcebergDruidModule implements DruidModule
|
|||
.registerSubtypes(
|
||||
new NamedType(HiveIcebergCatalog.class, HiveIcebergCatalog.TYPE_KEY),
|
||||
new NamedType(LocalCatalog.class, LocalCatalog.TYPE_KEY),
|
||||
new NamedType(RestIcebergCatalog.class, RestIcebergCatalog.TYPE_KEY),
|
||||
new NamedType(IcebergInputSource.class, IcebergInputSource.TYPE_KEY)
|
||||
|
||||
)
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.logger.Logger;
|
|||
import org.apache.druid.utils.DynamicConfigProviderUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.iceberg.BaseMetastoreCatalog;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
import org.apache.iceberg.hive.HiveCatalog;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -45,7 +45,6 @@ import java.util.Map;
|
|||
*/
|
||||
public class HiveIcebergCatalog extends IcebergCatalog
|
||||
{
|
||||
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
|
||||
public static final String TYPE_KEY = "hive";
|
||||
|
||||
@JsonProperty
|
||||
|
@ -62,7 +61,7 @@ public class HiveIcebergCatalog extends IcebergCatalog
|
|||
|
||||
private final Configuration configuration;
|
||||
|
||||
private BaseMetastoreCatalog hiveCatalog;
|
||||
private Catalog hiveCatalog;
|
||||
|
||||
private static final Logger log = new Logger(HiveIcebergCatalog.class);
|
||||
|
||||
|
@ -88,7 +87,7 @@ public class HiveIcebergCatalog extends IcebergCatalog
|
|||
}
|
||||
|
||||
@Override
|
||||
public BaseMetastoreCatalog retrieveCatalog()
|
||||
public Catalog retrieveCatalog()
|
||||
{
|
||||
if (hiveCatalog == null) {
|
||||
hiveCatalog = setupCatalog();
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.druid.iceberg.filter.IcebergFilter;
|
|||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.iceberg.BaseMetastoreCatalog;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
|
@ -46,9 +45,10 @@ import java.util.List;
|
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY)
|
||||
public abstract class IcebergCatalog
|
||||
{
|
||||
public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider";
|
||||
private static final Logger log = new Logger(IcebergCatalog.class);
|
||||
|
||||
public abstract BaseMetastoreCatalog retrieveCatalog();
|
||||
public abstract Catalog retrieveCatalog();
|
||||
|
||||
public boolean isCaseSensitive()
|
||||
{
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.iceberg.BaseMetastoreCatalog;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
import org.apache.iceberg.hadoop.HadoopCatalog;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -46,7 +46,7 @@ public class LocalCatalog extends IcebergCatalog
|
|||
@JsonProperty
|
||||
private final Boolean caseSensitive;
|
||||
|
||||
private BaseMetastoreCatalog catalog;
|
||||
private Catalog catalog;
|
||||
|
||||
@JsonCreator
|
||||
public LocalCatalog(
|
||||
|
@ -83,7 +83,7 @@ public class LocalCatalog extends IcebergCatalog
|
|||
}
|
||||
|
||||
@Override
|
||||
public BaseMetastoreCatalog retrieveCatalog()
|
||||
public Catalog retrieveCatalog()
|
||||
{
|
||||
if (catalog == null) {
|
||||
catalog = setupCatalog();
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.iceberg.input;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.error.InvalidInput;
|
||||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.iceberg.guice.HiveConf;
|
||||
import org.apache.druid.utils.DynamicConfigProviderUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.iceberg.CatalogProperties;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
import org.apache.iceberg.catalog.SessionCatalog;
|
||||
import org.apache.iceberg.rest.HTTPClient;
|
||||
import org.apache.iceberg.rest.RESTCatalog;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Catalog implementation for Iceberg REST catalogs.
|
||||
*/
|
||||
public class RestIcebergCatalog extends IcebergCatalog
|
||||
{
|
||||
public static final String TYPE_KEY = "rest";
|
||||
|
||||
@JsonProperty
|
||||
private final String catalogUri;
|
||||
|
||||
@JsonProperty
|
||||
private final Map<String, String> catalogProperties;
|
||||
|
||||
private final Configuration configuration;
|
||||
|
||||
private Catalog restCatalog;
|
||||
|
||||
@JsonCreator
|
||||
public RestIcebergCatalog(
|
||||
@JsonProperty("catalogUri") String catalogUri,
|
||||
@JsonProperty("catalogProperties") @Nullable
|
||||
Map<String, Object> catalogProperties,
|
||||
@JacksonInject @Json ObjectMapper mapper,
|
||||
@JacksonInject @HiveConf Configuration configuration
|
||||
)
|
||||
{
|
||||
if (catalogUri == null) {
|
||||
throw InvalidInput.exception("catalogUri cannot be null");
|
||||
}
|
||||
this.catalogUri = catalogUri;
|
||||
this.catalogProperties = DynamicConfigProviderUtils.extraConfigAndSetStringMap(
|
||||
catalogProperties,
|
||||
DRUID_DYNAMIC_CONFIG_PROVIDER_KEY,
|
||||
mapper
|
||||
);
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Catalog retrieveCatalog()
|
||||
{
|
||||
if (restCatalog == null) {
|
||||
restCatalog = setupCatalog();
|
||||
}
|
||||
return restCatalog;
|
||||
}
|
||||
|
||||
public String getCatalogUri()
|
||||
{
|
||||
return catalogUri;
|
||||
}
|
||||
|
||||
public Map<String, String> getCatalogProperties()
|
||||
{
|
||||
return catalogProperties;
|
||||
}
|
||||
|
||||
private RESTCatalog setupCatalog()
|
||||
{
|
||||
RESTCatalog restCatalog = new RESTCatalog(
|
||||
SessionCatalog.SessionContext.createEmpty(),
|
||||
config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()
|
||||
);
|
||||
restCatalog.setConf(configuration);
|
||||
catalogProperties.put(CatalogProperties.URI, catalogUri);
|
||||
restCatalog.initialize("rest", catalogProperties);
|
||||
return restCatalog;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.iceberg.input;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.iceberg.rest.RESTCatalog;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class RestCatalogTest
|
||||
{
|
||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
private int port = 0;
|
||||
private HttpServer server = null;
|
||||
private ServerSocket serverSocket = null;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception
|
||||
{
|
||||
serverSocket = new ServerSocket(0);
|
||||
port = serverSocket.getLocalPort();
|
||||
serverSocket.close();
|
||||
server = HttpServer.create(new InetSocketAddress("localhost", port), 0);
|
||||
server.createContext(
|
||||
"/v1/config", // API for catalog fetchConfig which is invoked on catalog initialization
|
||||
(httpExchange) -> {
|
||||
String payload = "{}";
|
||||
byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
httpExchange.sendResponseHeaders(200, outputBytes.length);
|
||||
OutputStream os = httpExchange.getResponseBody();
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH, String.valueOf(outputBytes.length));
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes 0");
|
||||
os.write(outputBytes);
|
||||
os.close();
|
||||
}
|
||||
);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCatalogCreate()
|
||||
{
|
||||
String catalogUri = "http://localhost:" + port;
|
||||
|
||||
RestIcebergCatalog testRestCatalog = new RestIcebergCatalog(
|
||||
catalogUri,
|
||||
new HashMap<>(),
|
||||
mapper,
|
||||
new Configuration()
|
||||
);
|
||||
RESTCatalog innerCatalog = (RESTCatalog) testRestCatalog.retrieveCatalog();
|
||||
|
||||
Assert.assertEquals("rest", innerCatalog.name());
|
||||
Assert.assertNotNull(innerCatalog.properties());
|
||||
Assert.assertNotNull(testRestCatalog.getCatalogProperties());
|
||||
Assert.assertEquals(testRestCatalog.getCatalogUri(), innerCatalog.properties().get("uri"));
|
||||
}
|
||||
@After
|
||||
public void tearDown() throws IOException
|
||||
{
|
||||
if (server != null) {
|
||||
server.stop(0);
|
||||
}
|
||||
if (serverSocket != null) {
|
||||
serverSocket.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue