From b1be71f918e45497099b069d04482bde8aff025d Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Thu, 29 Jun 2023 08:59:43 +0200 Subject: [PATCH] NIFI-11334: Fixed PutIceberg processor instance interference due to same class loader usage This closes #7449. Signed-off-by: Peter Turcsanyi --- .../nifi-iceberg-processors-nar/pom.xml | 258 +++++++++++------- .../nifi-iceberg-processors/pom.xml | 102 ++++++- .../iceberg/AbstractIcebergProcessor.java | 22 +- .../nifi/processors/iceberg/IcebergUtils.java | 42 +++ .../nifi/processors/iceberg/PutIceberg.java | 7 +- .../catalog/IcebergCatalogFactory.java | 87 ++++++ .../iceberg/TestDataFileActions.java | 4 +- .../TestPutIcebergCustomValidation.java | 24 +- .../TestPutIcebergWithHadoopCatalog.java | 15 +- .../TestPutIcebergWithHiveCatalog.java | 29 +- .../catalog/TestHadoopCatalogService.java | 25 +- .../catalog/TestHiveCatalogService.java | 64 ++--- .../src/test/resources/secured-core-site.xml | 22 ++ .../test/resources/unsecured-core-site.xml | 22 ++ .../nifi-iceberg-services-api-nar/pom.xml | 162 ----------- .../nifi-iceberg-services-api/pom.xml | 115 -------- .../iceberg/IcebergCatalogProperty.java | 35 +++ .../iceberg/IcebergCatalogService.java | 11 +- .../services/iceberg/IcebergCatalogType.java | 23 ++ .../nifi-iceberg-services/pom.xml | 5 + .../iceberg/AbstractCatalogService.java | 62 +++-- .../iceberg/HadoopCatalogService.java | 24 +- .../services/iceberg/HiveCatalogService.java | 62 +++-- 23 files changed, 697 insertions(+), 525 deletions(-) create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java create mode 100644 nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml index 3c7e102bca..4ed687eb42 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors-nar/pom.xml @@ -43,101 +43,165 @@ - - - - - org.apache.iceberg - iceberg-core - provided - - - org.apache.hive - hive-shims - provided - - - org.codehaus.groovy - groovy-all - provided - - - org.apache.hadoop - hadoop-common - provided - - - org.slf4j - slf4j-reload4j - - - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-registry - provided - - - org.apache.zookeeper - zookeeper - provided - - - org.apache.curator - curator-client - provided - - - org.apache.curator - curator-framework - provided - - - com.fasterxml.jackson.core - jackson-databind - provided - - - com.fasterxml.jackson.core - jackson-core - provided - - - com.fasterxml.jackson.core - jackson-annotations - provided - - - org.xerial.snappy - snappy-java - provided - - - org.apache.ant - ant - provided - - - org.apache.ivy - ivy - provided - - - org.apache.orc - orc-core - provided - nohive - - - org.apache.parquet - parquet-avro - provided - - - + + + + include-hadoop-aws + + false + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + + + + include-hadoop-azure + + false + + + + org.apache.hadoop + hadoop-azure + ${hadoop.version} + + + com.google.guava + guava + + + com.fasterxml.jackson.core + jackson-core + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-azure-datalake + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-core + + + + + + + + include-hadoop-cloud-storage + + false + + + + org.apache.hadoop + hadoop-cloud-storage + ${hadoop.version} + + + log4j + log4j + + + org.apache.logging.log4j + log4j-core + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + + + + include-hadoop-ozone + + false + + + + org.apache.ozone + ozone-client + ${ozone.version} + + + commons-logging + commons-logging + + + org.apache.logging.log4j + log4j-core + + + org.bouncycastle + bcprov-jdk15on + + + org.bouncycastle + bcpkix-jdk15on + + + + + org.apache.ozone + ozone-filesystem + ${ozone.version} + + + org.bouncycastle + bcprov-jdk18on + + + org.bouncycastle + bcpkix-jdk18on + + + + + + include-hadoop-gcp + + false + + + + com.google.cloud.bigdataoss + gcs-connector + hadoop3-${gcs.version} + + + com.google.cloud.bigdataoss + util + ${gcs.version} + + + com.google.cloud.bigdataoss + util-hadoop + hadoop3-${gcs.version} + + + com.google.cloud.bigdataoss + gcsio + ${gcs.version} + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml index 61d40d0365..96e67ce03f 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml @@ -74,13 +74,27 @@ org.apache.iceberg iceberg-core ${iceberg.version} - provided + + + org.apache.iceberg + iceberg-hive-metastore + ${iceberg.version} org.apache.iceberg iceberg-data ${iceberg.version} + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + + org.apache.iceberg + iceberg-orc + ${iceberg.version} + org.apache.hive hive-exec @@ -171,18 +185,100 @@ org.apache.hadoop - hadoop-hdfs + hadoop-client ${hadoop.version} - provided log4j log4j + + org.slf4j + slf4j-log4j12 + + + org.slf4j + slf4j-reload4j + commons-logging commons-logging + + javax.servlet + javax.servlet-api + + + + + org.apache.hive + hive-metastore + ${hive.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-web + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.orc + orc-core + + + org.apache.hbase + hbase-client + + + co.cask.tephra + tephra-api + + + co.cask.tephra + tephra-core + + + co.cask.tephra + tephra-hbase-compat-1.0 + + + org.apache.parquet + parquet-hadoop-bundle + + + com.tdunning + json + + + com.zaxxer + HikariCP + + + com.google.guava + guava + diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java index 9f527344ec..3352959d17 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java @@ -18,9 +18,12 @@ package org.apache.nifi.processors.iceberg; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.ClassloaderIsolationKeyProvider; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processor.AbstractProcessor; @@ -36,11 +39,13 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser; +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; /** * Base Iceberg processor class. */ -public abstract class AbstractIcebergProcessor extends AbstractProcessor { +@RequiresInstanceClassLoading(cloneAncestorResources = true) +public abstract class AbstractIcebergProcessor extends AbstractProcessor implements ClassloaderIsolationKeyProvider { public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder() .name("catalog-service") @@ -66,14 +71,14 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { private volatile UserGroupInformation ugi; @OnScheduled - public final void onScheduled(final ProcessContext context) { + public void onScheduled(final ProcessContext context) { final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); if (kerberosUserService != null) { this.kerberosUser = kerberosUserService.createKerberosUser(); try { - this.ugi = getUgiForKerberosUser(catalogService.getConfiguration(), kerberosUser); + this.ugi = getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser); } catch (IOException e) { throw new ProcessException("Kerberos Authentication failed", e); } @@ -81,7 +86,7 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { } @OnStopped - public final void onStopped() { + public void onStopped() { if (kerberosUser != null) { try { kerberosUser.logout(); @@ -117,6 +122,15 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor { } } + @Override + public String getClassloaderIsolationKey(PropertyContext context) { + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + if (kerberosUserService != null) { + return kerberosUserService.getIdentifier(); + } + return null; + } + private UserGroupInformation getUgi() { try { kerberosUser.checkTGTAndRelogin(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java new file mode 100644 index 0000000000..7a3db7de71 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.util.List; + +public class IcebergUtils { + + /** + * Loads configuration files from the provided paths. + * + * @param configFilePaths list of config file paths separated with comma + * @return merged configuration + */ + public static Configuration getConfigurationFromFiles(List configFilePaths) { + final Configuration conf = new Configuration(); + if (configFilePaths != null) { + for (final String configFile : configFilePaths) { + conf.addResource(new Path(configFile.trim())); + } + } + return conf; + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 20f67093ab..360ea17f1b 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; import org.apache.nifi.serialization.RecordReader; @@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; @Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"}) @CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " + @@ -208,7 +210,7 @@ public class PutIceberg extends AbstractIcebergProcessor { if (catalogServiceEnabled) { final boolean kerberosUserServiceIsSet = context.getProperty(KERBEROS_USER_SERVICE).isSet(); - final boolean securityEnabled = SecurityUtil.isSecurityEnabled(catalogService.getConfiguration()); + final boolean securityEnabled = SecurityUtil.isSecurityEnabled(getConfigurationFromFiles(catalogService.getConfigFilePaths())); if (securityEnabled && !kerberosUserServiceIsSet) { problems.add(new ValidationResult.Builder() @@ -293,7 +295,8 @@ public class PutIceberg extends AbstractIcebergProcessor { final String catalogNamespace = context.getProperty(CATALOG_NAMESPACE).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final Catalog catalog = catalogService.getCatalog(); + final IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + final Catalog catalog = catalogFactory.create(); final Namespace namespace = Namespace.of(catalogNamespace); final TableIdentifier tableIdentifier = TableIdentifier.of(namespace, tableName); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java new file mode 100644 index 0000000000..1b1e058090 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg.catalog; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; +import org.apache.nifi.services.iceberg.IcebergCatalogService; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + +public class IcebergCatalogFactory { + + private final IcebergCatalogService catalogService; + + public IcebergCatalogFactory(IcebergCatalogService catalogService) { + this.catalogService = catalogService; + } + + public Catalog create() { + switch (catalogService.getCatalogType()) { + case HIVE: + return initHiveCatalog(catalogService); + case HADOOP: + return initHadoopCatalog(catalogService); + default: + throw new IllegalArgumentException("Unknown catalog type: " + catalogService.getCatalogType()); + } + } + + private Catalog initHiveCatalog(IcebergCatalogService catalogService) { + HiveCatalog catalog = new HiveCatalog(); + + if (catalogService.getConfigFilePaths() != null) { + final Configuration configuration = getConfigurationFromFiles(catalogService.getConfigFilePaths()); + catalog.setConf(configuration); + } + + final Map catalogProperties = catalogService.getCatalogProperties(); + final Map properties = new HashMap<>(); + + if (catalogProperties.containsKey(METASTORE_URI)) { + properties.put(CatalogProperties.URI, catalogProperties.get(METASTORE_URI)); + } + + if (catalogProperties.containsKey(WAREHOUSE_LOCATION)) { + properties.put(CatalogProperties.WAREHOUSE_LOCATION, catalogProperties.get(WAREHOUSE_LOCATION)); + } + + catalog.initialize("hive-catalog", properties); + return catalog; + } + + private Catalog initHadoopCatalog(IcebergCatalogService catalogService) { + final Map catalogProperties = catalogService.getCatalogProperties(); + final String warehousePath = catalogProperties.get(WAREHOUSE_LOCATION); + + if (catalogService.getConfigFilePaths() != null) { + return new HadoopCatalog(getConfigurationFromFiles(catalogService.getConfigFilePaths()), warehousePath); + } else { + return new HadoopCatalog(new Configuration(), warehousePath); + } + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java index ee6b4c0e19..4e535c3f8a 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -33,6 +33,7 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; @@ -193,7 +194,8 @@ public class TestDataFileActions { private Table initCatalog() throws IOException { TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - Catalog catalog = catalogService.getCatalog(); + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + Catalog catalog = catalogFactory.create(); return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned()); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java index 7ca4bde3ac..36f14cedb8 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java @@ -17,7 +17,6 @@ */ package org.apache.nifi.processors.iceberg; -import org.apache.hadoop.conf.Configuration; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.reporting.InitializationException; @@ -27,6 +26,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.List; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,8 +58,8 @@ public class TestPutIcebergCustomValidation { runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME); } - private void initCatalogService(Configuration configuration) throws InitializationException { - TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfig(configuration).build(); + private void initCatalogService(List configFilePaths) throws InitializationException { + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfigFilePaths(configFilePaths).build(); runner.addControllerService(CATALOG_SERVICE_NAME, catalogService); runner.enableControllerService(catalogService); @@ -78,10 +80,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException { initRecordReader(); - - Configuration config = new Configuration(); - config.set("hadoop.security.authentication", "kerberos"); - initCatalogService(config); + initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -91,10 +90,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - - Configuration config = new Configuration(); - config.set("hadoop.security.authentication", "kerberos"); - initCatalogService(config); + initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); initKerberosUserService(); @@ -106,8 +102,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - - initCatalogService(new Configuration()); + initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -117,8 +112,7 @@ public class TestPutIcebergCustomValidation { @Test public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException { initRecordReader(); - - initCatalogService(new Configuration()); + initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); initKerberosUserService(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java index 9b75ee9ef6..ff8f5a9a3e 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java @@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; @@ -62,6 +63,7 @@ public class TestPutIcebergWithHadoopCatalog { private TestRunner runner; private PutIceberg processor; private Schema inputSchema; + private Catalog catalog; private static final Namespace NAMESPACE = Namespace.of("default"); @@ -100,9 +102,10 @@ public class TestPutIcebergWithHadoopCatalog { runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); } - private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException { + private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException { TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - Catalog catalog = catalogService.getCatalog(); + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + catalog = catalogFactory.create(); Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.FORMAT_VERSION, "2"); @@ -114,8 +117,6 @@ public class TestPutIcebergWithHadoopCatalog { runner.enableControllerService(catalogService); runner.setProperty(PutIceberg.CATALOG, "catalog-service"); - - return catalog; } @DisabledOnOs(WINDOWS) @@ -128,7 +129,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -156,7 +157,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); @@ -185,7 +186,7 @@ public class TestPutIcebergWithHadoopCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); + initCatalog(spec, fileFormat); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); runner.setProperty(PutIceberg.TABLE_NAME, "date"); runner.setValidateExpressionUsage(false); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index 3ec70c3d37..c672d90e8b 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -22,12 +22,14 @@ import org.apache.commons.io.IOUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.hive.metastore.ThriftMetastore; +import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; import org.apache.nifi.reporting.InitializationException; @@ -66,7 +68,7 @@ public class TestPutIcebergWithHiveCatalog { private TestRunner runner; private PutIceberg processor; private Schema inputSchema; - private TestHiveCatalogService catalogService; + private Catalog catalog; @RegisterExtension public static ThriftMetastore metastore = new ThriftMetastore(); @@ -90,16 +92,11 @@ public class TestPutIcebergWithHiveCatalog { inputSchema = new Schema.Parser().parse(avroSchema); processor = new PutIceberg(); - - catalogService = new TestHiveCatalogService.Builder() - .withMetastoreUri(metastore.getThriftConnectionUri()) - .withWarehouseLocation(metastore.getWarehouseLocation()) - .build(); } @AfterEach public void tearDown() { - catalogService.getCatalog().dropTable(TABLE_IDENTIFIER); + catalog.dropTable(TABLE_IDENTIFIER); } private void initRecordReader() throws InitializationException { @@ -126,7 +123,15 @@ public class TestPutIcebergWithHiveCatalog { tableProperties.put(TableProperties.FORMAT_VERSION, "2"); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); - catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() + .withMetastoreUri(metastore.getThriftConnectionUri()) + .withWarehouseLocation(metastore.getWarehouseLocation()) + .build(); + + IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + catalog = catalogFactory.create(); + + catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); runner.addControllerService("catalog-service", catalogService); runner.enableControllerService(catalogService); @@ -150,7 +155,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -187,7 +192,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -225,7 +230,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0]); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -266,7 +271,7 @@ public class TestPutIcebergWithHiveCatalog { runner.enqueue(new byte[0], attributes); runner.run(); - Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); + Table table = catalog.loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java index 111c4c5720..9673b894a3 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java @@ -17,35 +17,40 @@ */ package org.apache.nifi.processors.iceberg.catalog; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.apache.nifi.services.iceberg.IcebergCatalogType; import java.io.File; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static java.nio.file.Files.createTempDirectory; public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService { - private final HadoopCatalog catalog; + private final Map catalogProperties = new HashMap<>(); public TestHadoopCatalogService() throws IOException { File warehouseLocation = createTempDirectory("metastore").toFile(); - - catalog = new HadoopCatalog(new Configuration(), warehouseLocation.getAbsolutePath()); + catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, warehouseLocation.getAbsolutePath()); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HADOOP; } @Override - public Configuration getConfiguration() { - return catalog.getConf(); + public Map getCatalogProperties() { + return catalogProperties; } + @Override + public List getConfigFilePaths() { + return null; + } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java index 3a65e944f5..0cd7f042a8 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java @@ -17,28 +17,47 @@ */ package org.apache.nifi.processors.iceberg.catalog; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.services.iceberg.IcebergCatalogProperty; import org.apache.nifi.services.iceberg.IcebergCatalogService; +import org.apache.nifi.services.iceberg.IcebergCatalogType; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService { - private final HiveCatalog catalog; + private final List configFilePaths; + private final Map catalogProperties; - public TestHiveCatalogService(HiveCatalog catalog) { - this.catalog = catalog; + public TestHiveCatalogService(Map catalogProperties, List configFilePaths) { + this.catalogProperties = catalogProperties; + this.configFilePaths = configFilePaths; + } + + @Override + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HIVE; + } + + @Override + public Map getCatalogProperties() { + return catalogProperties; + } + + @Override + public List getConfigFilePaths() { + return configFilePaths; } public static class Builder { private String metastoreUri; private String warehouseLocation; - private Configuration config; + private List configFilePaths; public Builder withMetastoreUri(String metastoreUri) { this.metastoreUri = metastoreUri; @@ -50,40 +69,23 @@ public class TestHiveCatalogService extends AbstractControllerService implements return this; } - public Builder withConfig(Configuration config) { - this.config = config; + public Builder withConfigFilePaths(List configFilePaths) { + this.configFilePaths = configFilePaths; return this; } public TestHiveCatalogService build() { - HiveCatalog catalog = new HiveCatalog(); - Map properties = new HashMap<>(); + Map properties = new HashMap<>(); if (metastoreUri != null) { - properties.put(CatalogProperties.URI, metastoreUri); + properties.put(METASTORE_URI, metastoreUri); } if (warehouseLocation != null) { - properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + properties.put(WAREHOUSE_LOCATION, warehouseLocation); } - if (config != null) { - catalog.setConf(config); - } - - catalog.initialize("hive-catalog", properties); - return new TestHiveCatalogService(catalog); + return new TestHiveCatalogService(properties, configFilePaths); } } - - @Override - public Catalog getCatalog() { - return catalog; - } - - @Override - public Configuration getConfiguration() { - return catalog.getConf(); - } - } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml new file mode 100644 index 0000000000..0fd06a5383 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/secured-core-site.xml @@ -0,0 +1,22 @@ + + + + + + hadoop.security.authentication + kerberos + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml new file mode 100644 index 0000000000..d590a5039c --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/unsecured-core-site.xml @@ -0,0 +1,22 @@ + + + + + + hadoop.security.authentication + simple + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml index 3ce733bd25..4e7d773945 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api-nar/pom.xml @@ -37,166 +37,4 @@ nar - - - - - include-hadoop-aws - - false - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - - - - - - include-hadoop-azure - - false - - - - org.apache.hadoop - hadoop-azure - ${hadoop.version} - - - com.google.guava - guava - - - com.fasterxml.jackson.core - jackson-core - - - commons-logging - commons-logging - - - - - org.apache.hadoop - hadoop-azure-datalake - ${hadoop.version} - - - com.fasterxml.jackson.core - jackson-core - - - - - - - - include-hadoop-cloud-storage - - false - - - - org.apache.hadoop - hadoop-cloud-storage - ${hadoop.version} - - - log4j - log4j - - - org.apache.logging.log4j - log4j-core - - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - - - - - - - - include-hadoop-ozone - - false - - - - org.apache.ozone - ozone-client - ${ozone.version} - - - commons-logging - commons-logging - - - org.apache.logging.log4j - log4j-core - - - org.bouncycastle - bcprov-jdk15on - - - org.bouncycastle - bcpkix-jdk15on - - - - - org.apache.ozone - ozone-filesystem - ${ozone.version} - - - org.bouncycastle - bcprov-jdk18on - - - org.bouncycastle - bcpkix-jdk18on - - - - - - include-hadoop-gcp - - false - - - - com.google.cloud.bigdataoss - gcs-connector - hadoop3-${gcs.version} - - - com.google.cloud.bigdataoss - util - ${gcs.version} - - - com.google.cloud.bigdataoss - util-hadoop - hadoop3-${gcs.version} - - - com.google.cloud.bigdataoss - gcsio - ${gcs.version} - - - - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml index f7783db574..9cf2556a9e 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/pom.xml @@ -31,120 +31,5 @@ org.apache.nifi nifi-api - - - - org.apache.iceberg - iceberg-hive-metastore - ${iceberg.version} - - - org.apache.iceberg - iceberg-parquet - ${iceberg.version} - - - org.apache.iceberg - iceberg-orc - ${iceberg.version} - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - org.slf4j - slf4j-reload4j - - - commons-logging - commons-logging - - - javax.servlet - javax.servlet-api - - - - - org.apache.hive - hive-metastore - ${hive.version} - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - - - org.apache.logging.log4j - log4j-core - - - org.apache.logging.log4j - log4j-web - - - org.apache.logging.log4j - log4j-1.2-api - - - org.apache.logging.log4j - log4j-slf4j-impl - - - org.apache.orc - orc-core - - - org.apache.hbase - hbase-client - - - co.cask.tephra - tephra-api - - - co.cask.tephra - tephra-core - - - co.cask.tephra - tephra-hbase-compat-1.0 - - - org.apache.parquet - parquet-hadoop-bundle - - - com.tdunning - json - - - com.zaxxer - HikariCP - - - com.google.guava - guava - - - \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java new file mode 100644 index 0000000000..f4c55c39c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg; + +public enum IcebergCatalogProperty { + + METASTORE_URI("hive.metastore.uris"), + WAREHOUSE_LOCATION("hive.metastore.warehouse.dir"); + + private final String hadoopPropertyName; + + IcebergCatalogProperty(String hadoopPropertyName) { + this.hadoopPropertyName = hadoopPropertyName; + } + + public String getHadoopPropertyName() { + return hadoopPropertyName; + } + +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java index 991ac625dc..56e595d2e9 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java @@ -17,16 +17,19 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; import org.apache.nifi.controller.ControllerService; +import java.util.List; +import java.util.Map; + /** * Provides a basic connector to Iceberg catalog services. */ public interface IcebergCatalogService extends ControllerService { - Catalog getCatalog(); + IcebergCatalogType getCatalogType(); - Configuration getConfiguration(); + Map getCatalogProperties(); + + List getConfigFilePaths(); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java new file mode 100644 index 0000000000..4b8640da1d --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg; + +public enum IcebergCatalogType { + HIVE, + HADOOP +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml index 2a22d49aac..e81de19917 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml @@ -36,6 +36,11 @@ nifi-utils 2.0.0-SNAPSHOT + + org.apache.nifi + nifi-xml-processing + 2.0.0-SNAPSHOT + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java index 38f156c68d..7afc68a6a0 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java @@ -17,14 +17,24 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; +import org.w3c.dom.Document; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** @@ -32,7 +42,9 @@ import org.apache.nifi.expression.ExpressionLanguageScope; */ public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService { - protected Configuration configuration = new Configuration(); + protected Map catalogProperties = new HashMap<>(); + + protected List configFilePaths; static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() .name("hadoop-config-resources") @@ -44,24 +56,38 @@ public abstract class AbstractCatalogService extends AbstractControllerService i .dynamicallyModifiesClasspath(true) .build(); - /** - * Loads configuration files from the provided paths. - * - * @param configFiles list of config file paths separated with comma - * @return merged configuration - */ - protected Configuration getConfigurationFromFiles(String configFiles) { - final Configuration conf = new Configuration(); - if (StringUtils.isNotBlank(configFiles)) { - for (final String configFile : configFiles.split(",")) { - conf.addResource(new Path(configFile.trim())); + protected List parseConfigFilePaths(String configFilePaths) { + List documentList = new ArrayList<>(); + for (final String configFile : createFilePathList(configFilePaths)) { + File file = new File(configFile.trim()); + try (final InputStream fis = new FileInputStream(file); + final InputStream in = new BufferedInputStream(fis)) { + final StandardDocumentProvider documentProvider = new StandardDocumentProvider(); + documentList.add(documentProvider.parse(in)); + } catch (IOException e) { + throw new ProcessException("Failed to load config files", e); } } - return conf; + return documentList; + } + + protected List createFilePathList(String configFilePaths) { + List filePathList = new ArrayList<>(); + if (configFilePaths != null && !configFilePaths.trim().isEmpty()) { + for (final String configFile : configFilePaths.split(",")) { + filePathList.add(configFile.trim()); + } + } + return filePathList; } @Override - public Configuration getConfiguration() { - return configuration; + public Map getCatalogProperties() { + return catalogProperties; + } + + @Override + public List getConfigFilePaths() { + return configFilePaths; } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java index dcf2dd395f..8f62e1e183 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + @Tags({"iceberg", "catalog", "service", "hadoop", "hdfs"}) @CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.") public class HadoopCatalogService extends AbstractCatalogService { @@ -39,6 +39,7 @@ public class HadoopCatalogService extends AbstractCatalogService { .name("warehouse-path") .displayName("Warehouse Path") .description("Path to the location of the warehouse.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -53,25 +54,18 @@ public class HadoopCatalogService extends AbstractCatalogService { return PROPERTIES; } - private HadoopCatalog catalog; - @OnEnabled public void onEnabled(final ConfigurationContext context) { - final String warehousePath = context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue(); - if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { - final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - - configuration = getConfigurationFromFiles(configFiles); - catalog = new HadoopCatalog(configuration, warehousePath); - } else { - catalog = new HadoopCatalog(new Configuration(), warehousePath); + configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue()); } + + catalogProperties.put(WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue()); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HADOOP; } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java index 25bafe8116..e609981d46 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java @@ -17,10 +17,6 @@ */ package org.apache.nifi.services.iceberg; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -30,14 +26,14 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; @Tags({"iceberg", "catalog", "service", "metastore", "hive"}) @CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.") @@ -47,7 +43,7 @@ public class HiveCatalogService extends AbstractCatalogService { .name("hive-metastore-uri") .displayName("Hive Metastore URI") .description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.URI_LIST_VALIDATOR) .build(); @@ -55,6 +51,7 @@ public class HiveCatalogService extends AbstractCatalogService { .name("warehouse-location") .displayName("Default Warehouse Location") .description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -69,14 +66,12 @@ public class HiveCatalogService extends AbstractCatalogService { return PROPERTIES; } - private HiveCatalog catalog; - @Override protected Collection customValidate(ValidationContext validationContext) { final List problems = new ArrayList<>(); - String configMetastoreUri = null; - String configWarehouseLocation = null; + boolean configMetastoreUriPresent = false; + boolean configWarehouseLocationPresent = false; final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue(); @@ -84,13 +79,30 @@ public class HiveCatalogService extends AbstractCatalogService { // Load the configurations for validation only if any config resource is provided and if either the metastore URI or the warehouse location property is missing if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && (propertyMetastoreUri == null || propertyWarehouseLocation == null)) { final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + final List documents = parseConfigFilePaths(configFiles); - Configuration configuration = getConfigurationFromFiles(configFiles); - configMetastoreUri = configuration.get("hive.metastore.uris"); - configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir"); + for (Document document : documents) { + final NodeList nameNodeList = document.getElementsByTagName("name"); + + for (int i = 0; i < nameNodeList.getLength(); i++) { + final String nodeValue = nameNodeList.item(i).getFirstChild().getNodeValue(); + + if (nodeValue.equals(IcebergCatalogProperty.METASTORE_URI.getHadoopPropertyName())) { + configMetastoreUriPresent = true; + } + + if (nodeValue.equals(IcebergCatalogProperty.WAREHOUSE_LOCATION.getHadoopPropertyName())) { + configWarehouseLocationPresent = true; + } + + if (configMetastoreUriPresent && configWarehouseLocationPresent) { + break; + } + } + } } - if (configMetastoreUri == null && propertyMetastoreUri == null) { + if (!configMetastoreUriPresent && propertyMetastoreUri == null) { problems.add(new ValidationResult.Builder() .subject("Hive Metastore URI") .valid(false) @@ -99,7 +111,7 @@ public class HiveCatalogService extends AbstractCatalogService { .build()); } - if (configWarehouseLocation == null && propertyWarehouseLocation == null) { + if (!configWarehouseLocationPresent && propertyWarehouseLocation == null) { problems.add(new ValidationResult.Builder() .subject("Default Warehouse Location") .valid(false) @@ -113,29 +125,21 @@ public class HiveCatalogService extends AbstractCatalogService { @OnEnabled public void onEnabled(final ConfigurationContext context) { - catalog = new HiveCatalog(); - Map properties = new HashMap<>(); - if (context.getProperty(METASTORE_URI).isSet()) { - properties.put(CatalogProperties.URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue()); + catalogProperties.put(IcebergCatalogProperty.METASTORE_URI, context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue()); } if (context.getProperty(WAREHOUSE_LOCATION).isSet()) { - properties.put(CatalogProperties.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue()); + catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue()); } if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { - final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); - - configuration = getConfigurationFromFiles(configFiles); - catalog.setConf(configuration); + configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue()); } - - catalog.initialize("hive-catalog", properties); } @Override - public Catalog getCatalog() { - return catalog; + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.HIVE; } }