mirror of https://github.com/apache/nifi.git
NIFI-11215: Add custom validation for KerberosUserService in PutIceberg
This closes #6985. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
feebb2c399
commit
b50c8101cf
|
@ -34,8 +34,11 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.hadoop.SecurityUtil;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
|
@ -49,7 +52,9 @@ import org.apache.nifi.serialization.record.Record;
|
|||
import org.apache.nifi.services.iceberg.IcebergCatalogService;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -192,6 +197,36 @@ public class PutIceberg extends AbstractIcebergProcessor {
|
|||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
final List<ValidationResult> problems = new ArrayList<>();
|
||||
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
|
||||
boolean catalogServiceEnabled = context.getControllerServiceLookup().isControllerServiceEnabled(catalogService);
|
||||
|
||||
if (catalogServiceEnabled) {
|
||||
final boolean kerberosUserServiceIsSet = context.getProperty(KERBEROS_USER_SERVICE).isSet();
|
||||
final boolean securityEnabled = SecurityUtil.isSecurityEnabled(catalogService.getConfiguration());
|
||||
|
||||
if (securityEnabled && !kerberosUserServiceIsSet) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_USER_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("'hadoop.security.authentication' is set to 'kerberos' in the hadoop configuration files but no KerberosUserService is configured.")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (!securityEnabled && kerberosUserServiceIsSet) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_USER_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("KerberosUserService is configured but 'hadoop.security.authentication' is not set to 'kerberos' in the hadoop configuration files.")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.iceberg;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.nifi.kerberos.KerberosUserService;
|
||||
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestPutIcebergCustomValidation {
|
||||
|
||||
private static final String RECORD_READER_NAME = "record-reader";
|
||||
private static final String KERBEROS_USER_SERVICE_NAME = "kerberos-user-service";
|
||||
private static final String CATALOG_SERVICE_NAME = "catalog-service";
|
||||
|
||||
private static final String CATALOG_NAMESPACE = "catalogNamespace";
|
||||
private static final String TABLE_NAME = "tableName";
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
PutIceberg processor = new PutIceberg();
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
}
|
||||
|
||||
private void initRecordReader() throws InitializationException {
|
||||
MockRecordParser readerFactory = new MockRecordParser();
|
||||
|
||||
runner.addControllerService(RECORD_READER_NAME, readerFactory);
|
||||
runner.enableControllerService(readerFactory);
|
||||
|
||||
runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME);
|
||||
}
|
||||
|
||||
private void initCatalogService(Configuration configuration) throws InitializationException {
|
||||
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfig(configuration).build();
|
||||
|
||||
runner.addControllerService(CATALOG_SERVICE_NAME, catalogService);
|
||||
runner.enableControllerService(catalogService);
|
||||
|
||||
runner.setProperty(PutIceberg.CATALOG, CATALOG_SERVICE_NAME);
|
||||
}
|
||||
|
||||
private void initKerberosUserService() throws InitializationException {
|
||||
KerberosUserService kerberosUserService = mock(KerberosUserService.class);
|
||||
when(kerberosUserService.getIdentifier()).thenReturn(KERBEROS_USER_SERVICE_NAME);
|
||||
|
||||
runner.addControllerService(KERBEROS_USER_SERVICE_NAME, kerberosUserService);
|
||||
runner.enableControllerService(kerberosUserService);
|
||||
|
||||
runner.setProperty(PutIceberg.KERBEROS_USER_SERVICE, KERBEROS_USER_SERVICE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException {
|
||||
initRecordReader();
|
||||
|
||||
Configuration config = new Configuration();
|
||||
config.set("hadoop.security.authentication", "kerberos");
|
||||
initCatalogService(config);
|
||||
|
||||
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
|
||||
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException {
|
||||
initRecordReader();
|
||||
|
||||
Configuration config = new Configuration();
|
||||
config.set("hadoop.security.authentication", "kerberos");
|
||||
initCatalogService(config);
|
||||
|
||||
initKerberosUserService();
|
||||
|
||||
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
|
||||
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException {
|
||||
initRecordReader();
|
||||
|
||||
initCatalogService(new Configuration());
|
||||
|
||||
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
|
||||
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException {
|
||||
initRecordReader();
|
||||
|
||||
initCatalogService(new Configuration());
|
||||
|
||||
initKerberosUserService();
|
||||
|
||||
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE);
|
||||
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
|
||||
runner.assertNotValid();
|
||||
}
|
||||
}
|
|
@ -107,7 +107,10 @@ public class TestPutIcebergWithHiveCatalog {
|
|||
}
|
||||
|
||||
private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException {
|
||||
TestHiveCatalogService catalogService = new TestHiveCatalogService(metastore.getThriftConnectionUri(), metastore.getWarehouseLocation());
|
||||
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder()
|
||||
.withMetastoreUri(metastore.getThriftConnectionUri())
|
||||
.withWarehouseLocation(metastore.getWarehouseLocation())
|
||||
.build();
|
||||
Catalog catalog = catalogService.getCatalog();
|
||||
|
||||
Map<String, String> tableProperties = new HashMap<>();
|
||||
|
|
|
@ -30,7 +30,7 @@ import static java.nio.file.Files.createTempDirectory;
|
|||
|
||||
public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService {
|
||||
|
||||
private final Catalog catalog;
|
||||
private final HadoopCatalog catalog;
|
||||
|
||||
public TestHadoopCatalogService() throws IOException {
|
||||
File warehouseLocation = createTempDirectory("metastore").toFile();
|
||||
|
@ -45,7 +45,7 @@ public class TestHadoopCatalogService extends AbstractControllerService implemen
|
|||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return null;
|
||||
return catalog.getConf();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,20 +29,51 @@ import java.util.Map;
|
|||
|
||||
public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService {
|
||||
|
||||
private Catalog catalog;
|
||||
private final HiveCatalog catalog;
|
||||
|
||||
public TestHiveCatalogService(String metastoreUri, String warehouseLocation) {
|
||||
initCatalog(metastoreUri, warehouseLocation);
|
||||
public TestHiveCatalogService(HiveCatalog catalog) {
|
||||
this.catalog = catalog;
|
||||
}
|
||||
|
||||
public void initCatalog(String metastoreUri, String warehouseLocation) {
|
||||
catalog = new HiveCatalog();
|
||||
public static class Builder {
|
||||
private String metastoreUri;
|
||||
private String warehouseLocation;
|
||||
private Configuration config;
|
||||
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put(CatalogProperties.URI, metastoreUri);
|
||||
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
|
||||
public Builder withMetastoreUri(String metastoreUri) {
|
||||
this.metastoreUri = metastoreUri;
|
||||
return this;
|
||||
}
|
||||
|
||||
catalog.initialize("hive-catalog", properties);
|
||||
public Builder withWarehouseLocation(String warehouseLocation) {
|
||||
this.warehouseLocation = warehouseLocation;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withConfig(Configuration config) {
|
||||
this.config = config;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TestHiveCatalogService build() {
|
||||
HiveCatalog catalog = new HiveCatalog();
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
|
||||
if (metastoreUri != null) {
|
||||
properties.put(CatalogProperties.URI, metastoreUri);
|
||||
}
|
||||
|
||||
if (warehouseLocation != null) {
|
||||
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
|
||||
}
|
||||
|
||||
if (config != null) {
|
||||
catalog.setConf(config);
|
||||
}
|
||||
|
||||
catalog.initialize("hive-catalog", properties);
|
||||
return new TestHiveCatalogService(catalog);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,7 +83,7 @@ public class TestHiveCatalogService extends AbstractControllerService implements
|
|||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return null;
|
||||
return catalog.getConf();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -78,7 +78,11 @@ public class HiveCatalogService extends AbstractCatalogService {
|
|||
String configMetastoreUri = null;
|
||||
String configWarehouseLocation = null;
|
||||
|
||||
if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) {
|
||||
final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
|
||||
final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
|
||||
|
||||
// Load the configurations for validation only if any config resource is provided and if either the metastore URI or the warehouse location property is missing
|
||||
if (validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet() && (propertyMetastoreUri == null || propertyWarehouseLocation == null)) {
|
||||
final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||
|
||||
Configuration configuration = getConfigurationFromFiles(configFiles);
|
||||
|
@ -86,9 +90,6 @@ public class HiveCatalogService extends AbstractCatalogService {
|
|||
configWarehouseLocation = configuration.get("hive.metastore.warehouse.dir");
|
||||
}
|
||||
|
||||
final String propertyMetastoreUri = validationContext.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
|
||||
final String propertyWarehouseLocation = validationContext.getProperty(WAREHOUSE_LOCATION).evaluateAttributeExpressions().getValue();
|
||||
|
||||
if (configMetastoreUri == null && propertyMetastoreUri == null) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject("Hive Metastore URI")
|
||||
|
|
Loading…
Reference in New Issue