NIFI-11334: Fixed PutIceberg processor instance interference due to same class loader usage

This closes #7449.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mark Bathori 2023-06-29 08:59:43 +02:00 committed by Peter Turcsanyi
parent 5f4cc106f1
commit b1be71f918
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
23 changed files with 697 additions and 525 deletions

View File

@ -43,101 +43,165 @@
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- Provided through nifi-iceberg-services-api -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<scope>provided</scope>
<classifier>nohive</classifier>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<profiles>
<!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem -->
<profile>
<id>include-hadoop-aws</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-azure and hadoop-azure-datalake for accessing HDFS with wasb://, abfs://, and adl:// filesystems -->
<profile>
<id>include-hadoop-azure</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure-datalake</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-cloud-storage -->
<profile>
<id>include-hadoop-cloud-storage</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cloud-storage</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-ozone for o3fs:// file system -->
<profile>
<id>include-hadoop-ozone</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-client</artifactId>
<version>${ozone.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-filesystem</artifactId>
<version>${ozone.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem -->
<profile>
<id>include-hadoop-gcp</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop3-${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util</artifactId>
<version>${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
<version>hadoop3-${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
<version>${gcs.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -74,13 +74,27 @@
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
@ -171,18 +185,100 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-api</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-hbase-compat-1.0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

View File

@ -18,9 +18,12 @@
package org.apache.nifi.processors.iceberg;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
@ -36,11 +39,13 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
/**
* Base Iceberg processor class.
*/
public abstract class AbstractIcebergProcessor extends AbstractProcessor {
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public abstract class AbstractIcebergProcessor extends AbstractProcessor implements ClassloaderIsolationKeyProvider {
public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
.name("catalog-service")
@ -66,14 +71,14 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor {
private volatile UserGroupInformation ugi;
@OnScheduled
public final void onScheduled(final ProcessContext context) {
public void onScheduled(final ProcessContext context) {
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
this.kerberosUser = kerberosUserService.createKerberosUser();
try {
this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser);
this.ugi = getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser);
} catch (IOException e) {
throw new ProcessException("Kerberos Authentication failed", e);
}
@ -81,7 +86,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor {
}
@OnStopped
public final void onStopped() {
public void onStopped() {
if (kerberosUser != null) {
try {
kerberosUser.logout();
@ -117,6 +122,15 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor {
}
}
@Override
public String getClassloaderIsolationKey(PropertyContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
return kerberosUserService.getIdentifier();
}
return null;
}
private UserGroupInformation getUgi() {
try {
kerberosUser.checkTGTAndRelogin();

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.util.List;
public class IcebergUtils {
/**
* Loads configuration files from the provided paths.
*
* @param configFilePaths list of config file paths separated with comma
* @return merged configuration
*/
public static Configuration getConfigurationFromFiles(List<String> configFilePaths) {
final Configuration conf = new Configuration();
if (configFilePaths != null) {
for (final String configFile : configFilePaths) {
conf.addResource(new Path(configFile.trim()));
}
}
return conf;
}
}

View File

@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
import org.apache.nifi.serialization.RecordReader;
@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
@ -208,7 +210,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
if (catalogServiceEnabled) {
final boolean kerberosUserServiceIsSet = context.getProperty(KERBEROS_USER_SERVICE).isSet();
final boolean securityEnabled = SecurityUtil.isSecurityEnabled(catalogService.getConfiguration());
final boolean securityEnabled = SecurityUtil.isSecurityEnabled(getConfigurationFromFiles(catalogService.getConfigFilePaths()));
if (securityEnabled && !kerberosUserServiceIsSet) {
problems.add(new ValidationResult.Builder()
@ -293,7 +295,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Catalog catalog = catalogService.getCatalog();
final IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService);
final Catalog catalog = catalogFactory.create();
final Namespace namespace = Namespace.of(catalogNamespace);
final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName);

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg.catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import java.util.HashMap;
import java.util.Map;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI;
import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
public class IcebergCatalogFactory {
private final IcebergCatalogService catalogService;
public IcebergCatalogFactory(IcebergCatalogService catalogService) {
this.catalogService = catalogService;
}
public Catalog create() {
switch (catalogService.getCatalogType()) {
case HIVE:
return initHiveCatalog(catalogService);
case HADOOP:
return initHadoopCatalog(catalogService);
default:
throw new IllegalArgumentException("Unknown catalog type: " + catalogService.getCatalogType());
}
}
private Catalog initHiveCatalog(IcebergCatalogService catalogService) {
HiveCatalog catalog = new HiveCatalog();
if (catalogService.getConfigFilePaths() != null) {
final Configuration configuration = getConfigurationFromFiles(catalogService.getConfigFilePaths());
catalog.setConf(configuration);
}
final Map<IcebergCatalogProperty, String> catalogProperties = catalogService.getCatalogProperties();
final Map <String, String> properties = new HashMap<>();
if (catalogProperties.containsKey(METASTORE_URI)) {
properties.put(CatalogProperties.URI, catalogProperties.get(METASTORE_URI));
}
if (catalogProperties.containsKey(WAREHOUSE_LOCATION)) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, catalogProperties.get(WAREHOUSE_LOCATION));
}
catalog.initialize("hive-catalog", properties);
return catalog;
}
private Catalog initHadoopCatalog(IcebergCatalogService catalogService) {
final Map<IcebergCatalogProperty, String> catalogProperties = catalogService.getCatalogProperties();
final String warehousePath = catalogProperties.get(WAREHOUSE_LOCATION);
if (catalogService.getConfigFilePaths() != null) {
return new HadoopCatalog(getConfigurationFromFiles(catalogService.getConfigFilePaths()), warehousePath);
} else {
return new HadoopCatalog(new Configuration(), warehousePath);
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory;
@ -193,7 +194,8 @@ public class TestDataFileActions {
private Table initCatalog() throws IOException {
TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
Catalog catalog = catalogService.getCatalog();
IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService);
Catalog catalog = catalogFactory.create();
return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned());
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.nifi.processors.iceberg;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
import org.apache.nifi.reporting.InitializationException;
@ -27,6 +26,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -56,8 +58,8 @@ public class TestPutIcebergCustomValidation {
runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME);
}
private void initCatalogService(Configuration configuration) throws InitializationException {
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfig(configuration).build();
private void initCatalogService(List<String> configFilePaths) throws InitializationException {
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfigFilePaths(configFilePaths).build();
runner.addControllerService(CATALOG_SERVICE_NAME, catalogService);
runner.enableControllerService(catalogService);
@ -78,10 +80,7 @@ public class TestPutIcebergCustomValidation {
@Test
public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException {
initRecordReader();
Configuration config = new Configuration();
config.set("hadoop.security.authentication", "kerberos");
initCatalogService(config);
initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml"));
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
@ -91,10 +90,7 @@ public class TestPutIcebergCustomValidation {
@Test
public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException {
initRecordReader();
Configuration config = new Configuration();
config.set("hadoop.security.authentication", "kerberos");
initCatalogService(config);
initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml"));
initKerberosUserService();
@ -106,8 +102,7 @@ public class TestPutIcebergCustomValidation {
@Test
public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException {
initRecordReader();
initCatalogService(new Configuration());
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
@ -117,8 +112,7 @@ public class TestPutIcebergCustomValidation {
@Test
public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException {
initRecordReader();
initCatalogService(new Configuration());
initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml"));
initKerberosUserService();

View File

@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
@ -62,6 +63,7 @@ public class TestPutIcebergWithHadoopCatalog {
private TestRunner runner;
private PutIceberg processor;
private Schema inputSchema;
private Catalog catalog;
private static final Namespace NAMESPACE = Namespace.of("default");
@ -100,9 +102,10 @@ public class TestPutIcebergWithHadoopCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException {
private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException {
TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
Catalog catalog = catalogService.getCatalog();
IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService);
catalog = catalogFactory.create();
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
@ -114,8 +117,6 @@ public class TestPutIcebergWithHadoopCatalog {
runner.enableControllerService(catalogService);
runner.setProperty(PutIceberg.CATALOG, "catalog-service");
return catalog;
}
@DisabledOnOs(WINDOWS)
@ -128,7 +129,7 @@ public class TestPutIcebergWithHadoopCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat);
initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@ -156,7 +157,7 @@ public class TestPutIcebergWithHadoopCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat);
initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@ -185,7 +186,7 @@ public class TestPutIcebergWithHadoopCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat);
initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);

View File

@ -22,12 +22,14 @@ import org.apache.commons.io.IOUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
import org.apache.nifi.reporting.InitializationException;
@ -66,7 +68,7 @@ public class TestPutIcebergWithHiveCatalog {
private TestRunner runner;
private PutIceberg processor;
private Schema inputSchema;
private TestHiveCatalogService catalogService;
private Catalog catalog;
@RegisterExtension
public static ThriftMetastore metastore = new ThriftMetastore();
@ -90,16 +92,11 @@ public class TestPutIcebergWithHiveCatalog {
inputSchema = new Schema.Parser().parse(avroSchema);
processor = new PutIceberg();
catalogService = new TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
.withWarehouseLocation(metastore.getWarehouseLocation())
.build();
}
@AfterEach
public void tearDown() {
catalogService.getCatalog().dropTable(TABLE_IDENTIFIER);
catalog.dropTable(TABLE_IDENTIFIER);
}
private void initRecordReader() throws InitializationException {
@ -126,7 +123,15 @@ public class TestPutIcebergWithHiveCatalog {
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties);
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
.withWarehouseLocation(metastore.getWarehouseLocation())
.build();
IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService);
catalog = catalogFactory.create();
catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties);
runner.addControllerService("catalog-service", catalogService);
runner.enableControllerService(catalogService);
@ -150,7 +155,7 @@ public class TestPutIcebergWithHiveCatalog {
runner.enqueue(new byte[0]);
runner.run();
Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
Table table = catalog.loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@ -187,7 +192,7 @@ public class TestPutIcebergWithHiveCatalog {
runner.enqueue(new byte[0]);
runner.run();
Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
Table table = catalog.loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@ -225,7 +230,7 @@ public class TestPutIcebergWithHiveCatalog {
runner.enqueue(new byte[0]);
runner.run();
Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
Table table = catalog.loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@ -266,7 +271,7 @@ public class TestPutIcebergWithHiveCatalog {
runner.enqueue(new byte[0], attributes);
runner.run();
Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
Table table = catalog.loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")

View File

@ -17,35 +17,40 @@
*/
package org.apache.nifi.processors.iceberg.catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.apache.nifi.services.iceberg.IcebergCatalogType;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.nio.file.Files.createTempDirectory;
public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService {
private final HadoopCatalog catalog;
private final Map<IcebergCatalogProperty, String> catalogProperties = new HashMap<>();
public TestHadoopCatalogService() throws IOException {
File warehouseLocation = createTempDirectory("metastore").toFile();
catalog = new HadoopCatalog(new Configuration(), warehouseLocation.getAbsolutePath());
catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, warehouseLocation.getAbsolutePath());
}
@Override
public Catalog getCatalog() {
return catalog;
public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.HADOOP;
}
@Override
public Configuration getConfiguration() {
return catalog.getConf();
public Map<IcebergCatalogProperty, String> getCatalogProperties() {
return catalogProperties;
}
@Override
public List<String> getConfigFilePaths() {
return null;
}
}

View File

@ -17,28 +17,47 @@
*/
package org.apache.nifi.processors.iceberg.catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.services.iceberg.IcebergCatalogProperty;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.apache.nifi.services.iceberg.IcebergCatalogType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI;
import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService {
private final HiveCatalog catalog;
private final List<String> configFilePaths;
private final Map<IcebergCatalogProperty, String> catalogProperties;
public TestHiveCatalogService(HiveCatalog catalog) {
this.catalog = catalog;
public TestHiveCatalogService(Map<IcebergCatalogProperty, String> catalogProperties, List<String> configFilePaths) {
this.catalogProperties = catalogProperties;
this.configFilePaths = configFilePaths;
}
@Override
public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.HIVE;
}
@Override
public Map<IcebergCatalogProperty, String> getCatalogProperties() {
return catalogProperties;
}
@Override
public List<String> getConfigFilePaths() {
return configFilePaths;
}
public static class Builder {
private String metastoreUri;
private String warehouseLocation;
private Configuration config;
private List<String> configFilePaths;
public Builder withMetastoreUri(String metastoreUri) {
this.metastoreUri = metastoreUri;
@ -50,40 +69,23 @@ public class TestHiveCatalogService extends AbstractControllerService implements
return this;
}
public Builder withConfig(Configuration config) {
this.config = config;
public Builder withConfigFilePaths(List<String> configFilePaths) {
this.configFilePaths = configFilePaths;
return this;
}
public TestHiveCatalogService build() {
HiveCatalog catalog = new HiveCatalog();
Map<String, String> properties = new HashMap<>();
Map<IcebergCatalogProperty, String> properties = new HashMap<>();
if (metastoreUri != null) {
properties.put(CatalogProperties.URI, metastoreUri);
properties.put(METASTORE_URI, metastoreUri);
}
if (warehouseLocation != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
properties.put(WAREHOUSE_LOCATION, warehouseLocation);
}
if (config != null) {
catalog.setConf(config);
}
catalog.initialize("hive-catalog", properties);
return new TestHiveCatalogService(catalog);
return new TestHiveCatalogService(properties, configFilePaths);
}
}
@Override
public Catalog getCatalog() {
return catalog;
}
@Override
public Configuration getConfiguration() {
return catalog.getConf();
}
}

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
</configuration>

View File

@ -37,166 +37,4 @@
<type>nar</type>
</dependency>
</dependencies>
<profiles>
<!-- Includes hadoop-aws for accessing HDFS with an s3a:// filesystem -->
<profile>
<id>include-hadoop-aws</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-azure and hadoop-azure-datalake for accessing HDFS with wasb://, abfs://, and adl:// filesystems -->
<profile>
<id>include-hadoop-azure</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure-datalake</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-cloud-storage -->
<profile>
<id>include-hadoop-cloud-storage</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-cloud-storage</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-ozone for o3fs:// file system -->
<profile>
<id>include-hadoop-ozone</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-client</artifactId>
<version>${ozone.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>ozone-filesystem</artifactId>
<version>${ozone.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
</dependency>
</dependencies>
</profile>
<!-- Includes hadoop-gcp for accessing HDFS with an gcs:// filesystem -->
<profile>
<id>include-hadoop-gcp</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop3-${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util</artifactId>
<version>${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
<version>hadoop3-${gcs.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcsio</artifactId>
<version>${gcs.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -31,120 +31,5 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<!-- External dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-api</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-hbase-compat-1.0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.iceberg;
public enum IcebergCatalogProperty {
METASTORE_URI("hive.metastore.uris"),
WAREHOUSE_LOCATION("hive.metastore.warehouse.dir");
private final String hadoopPropertyName;
IcebergCatalogProperty(String hadoopPropertyName) {
this.hadoopPropertyName = hadoopPropertyName;
}
public String getHadoopPropertyName() {
return hadoopPropertyName;
}
}

View File

@ -17,16 +17,19 @@
*/
package org.apache.nifi.services.iceberg;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.nifi.controller.ControllerService;
import java.util.List;
import java.util.Map;
/**
* Provides a basic connector to Iceberg catalog services.
*/
public interface IcebergCatalogService extends ControllerService {
Catalog getCatalog();
IcebergCatalogType getCatalogType();
Configuration getConfiguration();
Map<IcebergCatalogProperty, String> getCatalogProperties();
List<String> getConfigFilePaths();
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.iceberg;
public enum IcebergCatalogType {
HIVE,
HADOOP
}

View File

@ -36,6 +36,11 @@
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-xml-processing</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -17,14 +17,24 @@
*/
package org.apache.nifi.services.iceberg;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
import org.w3c.dom.Document;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -32,7 +42,9 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
*/
public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService {
protected Configuration configuration = new Configuration();
protected Map<IcebergCatalogProperty, String> catalogProperties = new HashMap<>();
protected List<String> configFilePaths;
static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("hadoop-config-resources")
@ -44,24 +56,38 @@ public abstract class AbstractCatalogService extends AbstractControllerService i
.dynamicallyModifiesClasspath(true)
.build();
/**
* Loads configuration files from the provided paths.
*
* @param configFiles list of config file paths separated with comma
* @return merged configuration
*/
protected Configuration getConfigurationFromFiles(String configFiles) {
final Configuration conf = new Configuration();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
conf.addResource(new Path(configFile.trim()));
protected List<Document> parseConfigFilePaths(String configFilePaths) {
List<Document> documentList = new ArrayList<>();
for (final String configFile : createFilePathList(configFilePaths)) {
File file = new File(configFile.trim());
try (final InputStream fis = new FileInputStream(file);
final InputStream in = new BufferedInputStream(fis)) {
final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
documentList.add(documentProvider.parse(in));
} catch (IOException e) {
throw new ProcessException("Failed to load config files", e);
}
}
return conf;
return documentList;
}
protected List<String> createFilePathList(String configFilePaths) {
List<String> filePathList = new ArrayList<>();
if (configFilePaths != null && !configFilePaths.trim().isEmpty()) {
for (final String configFile : configFilePaths.split(",")) {
filePathList.add(configFile.trim());
}
}
return filePathList;
}
@Override
public Configuration getConfiguration() {
return configuration;
public Map<IcebergCatalogProperty, String> getCatalogProperties() {
return catalogProperties;
}
@Override
public List<String> getConfigFilePaths() {
return configFilePaths;
}
}

View File

@ -17,20 +17,20 @@
*/
package org.apache.nifi.services.iceberg;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION;
@Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"})
@CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.")
public class HadoopCatalogService extends AbstractCatalogService {
@ -39,6 +39,7 @@ public class HadoopCatalogService extends AbstractCatalogService {
.name("warehouse-path")
.displayName("Warehouse Path")
.description("Path to the location of the warehouse.")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -53,25 +54,18 @@ public class HadoopCatalogService extends AbstractCatalogService {
return PROPERTIES;
}
private HadoopCatalog catalog;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String warehousePath = context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue();
if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
configuration = getConfigurationFromFiles(configFiles);
catalog = new HadoopCatalog(configuration, warehousePath);
} else {
catalog = new HadoopCatalog(new Configuration(), warehousePath);
configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
}
catalogProperties.put(WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue());
}
@Override
public Catalog getCatalog() {
return catalog;
public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.HADOOP;
}
}

View File

@ -17,10 +17,6 @@
*/
package org.apache.nifi.services.iceberg;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -30,14 +26,14 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Tags({"iceberg", "catalog", "service", "metastore", "hive"})
@CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.")
@ -47,7 +43,7 @@ public class HiveCatalogService extends AbstractCatalogService {
.name("hive-metastore-uri")
.displayName("Hive Metastore URI")
.description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.URI_LIST_VALIDATOR)
.build();
@ -55,6 +51,7 @@ public class HiveCatalogService extends AbstractCatalogService {
.name("warehouse-location")
.displayName("Default Warehouse Location")
.description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
@ -69,14 +66,12 @@ public class HiveCatalogService extends AbstractCatalogService {
return PROPERTIES;
}
private HiveCatalog catalog;
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
String configMetastoreUri = null;
String configWarehouseLocation = null;
boolean configMetastoreUriPresent = false;
boolean configWarehouseLocationPresent = false;
final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
@ -84,13 +79,30 @@ public class HiveCatalogService extends AbstractCatalogService {
// Load the configurations for validation only if any config resource is provided and if either the metastore URI or the warehouse location property is missing
if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && (propertyMetastoreUri == null || propertyWarehouseLocation == null)) {
final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
final List<Document> documents = parseConfigFilePaths(configFiles);
Configuration configuration = getConfigurationFromFiles(configFiles);
configMetastoreUri = configuration.get("hive.metastore.uris");
configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir");
for (Document document : documents) {
final NodeList nameNodeList = document.getElementsByTagName("name");
for (int i = 0; i < nameNodeList.getLength(); i++) {
final String nodeValue = nameNodeList.item(i).getFirstChild().getNodeValue();
if (nodeValue.equals(IcebergCatalogProperty.METASTORE_URI.getHadoopPropertyName())) {
configMetastoreUriPresent = true;
}
if (nodeValue.equals(IcebergCatalogProperty.WAREHOUSE_LOCATION.getHadoopPropertyName())) {
configWarehouseLocationPresent = true;
}
if (configMetastoreUriPresent && configWarehouseLocationPresent) {
break;
}
}
}
}
if (configMetastoreUri == null && propertyMetastoreUri == null) {
if (!configMetastoreUriPresent && propertyMetastoreUri == null) {
problems.add(new ValidationResult.Builder()
.subject("Hive Metastore URI")
.valid(false)
@ -99,7 +111,7 @@ public class HiveCatalogService extends AbstractCatalogService {
.build());
}
if (configWarehouseLocation == null && propertyWarehouseLocation == null) {
if (!configWarehouseLocationPresent && propertyWarehouseLocation == null) {
problems.add(new ValidationResult.Builder()
.subject("Default Warehouse Location")
.valid(false)
@ -113,29 +125,21 @@ public class HiveCatalogService extends AbstractCatalogService {
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
catalog = new HiveCatalog();
Map<String, String> properties = new HashMap<>();
if (context.getProperty(METASTORE_URI).isSet()) {
properties.put(CatalogProperties.URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
catalogProperties.put(IcebergCatalogProperty.METASTORE_URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(WAREHOUSE_LOCATION).isSet()) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
configuration = getConfigurationFromFiles(configFiles);
catalog.setConf(configuration);
configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
}
catalog.initialize("hive-catalog", properties);
}
@Override
public Catalog getCatalog() {
return catalog;
public IcebergCatalogType getCatalogType() {
return IcebergCatalogType.HIVE;
}
}