diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 1af97768f3..d85a590a08 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -34,8 +34,11 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -49,7 +52,9 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.services.iceberg.IcebergCatalogService; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -192,6 +197,36 @@ public class PutIceberg extends AbstractIcebergProcessor { return RELATIONSHIPS; } + @Override + protected Collection customValidate(ValidationContext context) { + final List problems = new ArrayList<>(); + final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class); + boolean catalogServiceEnabled = context.getControllerServiceLookup().isControllerServiceEnabled(catalogService); + + if (catalogServiceEnabled) { + final boolean kerberosUserServiceIsSet = context.getProperty(KERBEROS_USER_SERVICE).isSet(); + final boolean securityEnabled = SecurityUtil.isSecurityEnabled(catalogService.getConfiguration()); + + if (securityEnabled && !kerberosUserServiceIsSet) { + problems.add(new ValidationResult.Builder() + .subject(KERBEROS_USER_SERVICE.getDisplayName()) + .valid(false) + .explanation("'hadoop.security.authentication' is set to 'kerberos' in the hadoop configuration files but no KerberosUserService is configured.") + .build()); + } + + if (!securityEnabled && kerberosUserServiceIsSet) { + problems.add(new ValidationResult.Builder() + .subject(KERBEROS_USER_SERVICE.getDisplayName()) + .valid(false) + .explanation("KerberosUserService is configured but 'hadoop.security.authentication' is not set to 'kerberos' in the hadoop configuration files.") + .build()); + } + } + + return problems; + } + @Override public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java new file mode 100644 index 0000000000..7ca4bde3ac --- /dev/null +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPutIcebergCustomValidation { + + private static final String RECORD_READER_NAME = "record-reader"; + private static final String KERBEROS_USER_SERVICE_NAME = "kerberos-user-service"; + private static final String CATALOG_SERVICE_NAME = "catalog-service"; + + private static final String CATALOG_NAMESPACE = "catalogNamespace"; + private static final String TABLE_NAME = "tableName"; + + private TestRunner runner; + + @BeforeEach + public void setUp() { + PutIceberg processor = new PutIceberg(); + runner = TestRunners.newTestRunner(processor); + } + + private void initRecordReader() throws InitializationException { + MockRecordParser readerFactory = new MockRecordParser(); + + runner.addControllerService(RECORD_READER_NAME, readerFactory); + runner.enableControllerService(readerFactory); + + runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME); + } + + private void initCatalogService(Configuration configuration) throws InitializationException { + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfig(configuration).build(); + + runner.addControllerService(CATALOG_SERVICE_NAME, catalogService); + runner.enableControllerService(catalogService); + + runner.setProperty(PutIceberg.CATALOG, CATALOG_SERVICE_NAME); + } + + private void initKerberosUserService() throws InitializationException { + KerberosUserService kerberosUserService = mock(KerberosUserService.class); + when(kerberosUserService.getIdentifier()).thenReturn(KERBEROS_USER_SERVICE_NAME); + + runner.addControllerService(KERBEROS_USER_SERVICE_NAME, kerberosUserService); + runner.enableControllerService(kerberosUserService); + + runner.setProperty(PutIceberg.KERBEROS_USER_SERVICE, KERBEROS_USER_SERVICE_NAME); + } + + @Test + public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException { + initRecordReader(); + + Configuration config = new Configuration(); + config.set("hadoop.security.authentication", "kerberos"); + initCatalogService(config); + + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); + runner.assertNotValid(); + } + + @Test + public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException { + initRecordReader(); + + Configuration config = new Configuration(); + config.set("hadoop.security.authentication", "kerberos"); + initCatalogService(config); + + initKerberosUserService(); + + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); + runner.assertValid(); + } + + @Test + public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException { + initRecordReader(); + + initCatalogService(new Configuration()); + + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); + runner.assertValid(); + } + + @Test + public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException { + initRecordReader(); + + initCatalogService(new Configuration()); + + initKerberosUserService(); + + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); + runner.assertNotValid(); + } +} diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index 62071102e6..33b1999362 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -107,7 +107,10 @@ public class TestPutIcebergWithHiveCatalog { } private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException { - TestHiveCatalogService catalogService = new TestHiveCatalogService(metastore.getThriftConnectionUri(), metastore.getWarehouseLocation()); + TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() + .withMetastoreUri(metastore.getThriftConnectionUri()) + .withWarehouseLocation(metastore.getWarehouseLocation()) + .build(); Catalog catalog = catalogService.getCatalog(); Map tableProperties = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java index d080067216..111c4c5720 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java @@ -30,7 +30,7 @@ import static java.nio.file.Files.createTempDirectory; public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService { - private final Catalog catalog; + private final HadoopCatalog catalog; public TestHadoopCatalogService() throws IOException { File warehouseLocation = createTempDirectory("metastore").toFile(); @@ -45,7 +45,7 @@ public class TestHadoopCatalogService extends AbstractControllerService implemen @Override public Configuration getConfiguration() { - return null; + return catalog.getConf(); } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java index a0a5bf441e..3a65e944f5 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java @@ -29,20 +29,51 @@ import java.util.Map; public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService { - private Catalog catalog; + private final HiveCatalog catalog; - public TestHiveCatalogService(String metastoreUri, String warehouseLocation) { - initCatalog(metastoreUri, warehouseLocation); + public TestHiveCatalogService(HiveCatalog catalog) { + this.catalog = catalog; } - public void initCatalog(String metastoreUri, String warehouseLocation) { - catalog = new HiveCatalog(); + public static class Builder { + private String metastoreUri; + private String warehouseLocation; + private Configuration config; - Map properties = new HashMap<>(); - properties.put(CatalogProperties.URI, metastoreUri); - properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + public Builder withMetastoreUri(String metastoreUri) { + this.metastoreUri = metastoreUri; + return this; + } - catalog.initialize("hive-catalog", properties); + public Builder withWarehouseLocation(String warehouseLocation) { + this.warehouseLocation = warehouseLocation; + return this; + } + + public Builder withConfig(Configuration config) { + this.config = config; + return this; + } + + public TestHiveCatalogService build() { + HiveCatalog catalog = new HiveCatalog(); + Map properties = new HashMap<>(); + + if (metastoreUri != null) { + properties.put(CatalogProperties.URI, metastoreUri); + } + + if (warehouseLocation != null) { + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + } + + if (config != null) { + catalog.setConf(config); + } + + catalog.initialize("hive-catalog", properties); + return new TestHiveCatalogService(catalog); + } } @Override @@ -52,7 +83,7 @@ public class TestHiveCatalogService extends AbstractControllerService implements @Override public Configuration getConfiguration() { - return null; + return catalog.getConf(); } } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java index 9dc648c3e9..25bafe8116 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java @@ -78,7 +78,11 @@ public class HiveCatalogService extends AbstractCatalogService { String configMetastoreUri = null; String configWarehouseLocation = null; - if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { + final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); + final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue(); + + // Load the configurations for validation only if any config resource is provided and if either the metastore URI or the warehouse location property is missing + if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && (propertyMetastoreUri == null || propertyWarehouseLocation == null)) { final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); Configuration configuration = getConfigurationFromFiles(configFiles); @@ -86,9 +90,6 @@ public class HiveCatalogService extends AbstractCatalogService { configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir"); } - final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); - final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue(); - if (configMetastoreUri == null && propertyMetastoreUri == null) { problems.add(new ValidationResult.Builder() .subject("Hive Metastore URI")