NIFI-11440 Speed up Iceberg Hive Metastore Tests

This closes #7170

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Bathori 2023-04-13 11:49:52 +02:00 committed by exceptionfactory
parent cff7808543
commit 44a7f7f38b
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 51 additions and 45 deletions

View File

@ -19,15 +19,15 @@ package org.apache.nifi.hive.metastore;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */ /** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */
public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback { public class ThriftMetastore implements BeforeAllCallback, AfterAllCallback {
private final MetastoreCore metastoreCore; private final MetastoreCore metastoreCore;
@ -43,12 +43,12 @@ public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
} }
@Override @Override
public void beforeEach(ExtensionContext context) throws Exception { public void beforeAll(ExtensionContext context) throws Exception {
metastoreCore.initialize(configOverrides); metastoreCore.initialize(configOverrides);
} }
@Override @Override
public void afterEach(ExtensionContext context) { public void afterAll(ExtensionContext context) {
metastoreCore.shutdown(); metastoreCore.shutdown();
} }

View File

@ -37,6 +37,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.thrift.TException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -56,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.condition.OS.WINDOWS; import static org.junit.jupiter.api.condition.OS.WINDOWS;
@DisabledOnOs(WINDOWS)
public class TestTriggerHiveMetaStoreEvent { public class TestTriggerHiveMetaStoreEvent {
private TestRunner runner; private TestRunner runner;
@ -71,7 +74,7 @@ public class TestTriggerHiveMetaStoreEvent {
); );
@RegisterExtension @RegisterExtension
public ThriftMetastore metastore = new ThriftMetastore() public static ThriftMetastore metastore = new ThriftMetastore()
.withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(), "org.apache.hive.hcatalog.listener.DbNotificationListener")); .withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(), "org.apache.hive.hcatalog.listener.DbNotificationListener"));
@BeforeEach @BeforeEach
@ -80,6 +83,11 @@ public class TestTriggerHiveMetaStoreEvent {
metaStoreClient = metastore.getMetaStoreClient(); metaStoreClient = metastore.getMetaStoreClient();
} }
@AfterEach
public void tearDown() throws TException {
metaStoreClient.dropTable(TEST_DATABASE_NAME, TEST_TABLE_NAME);
}
private void initUnPartitionedTable() throws Exception { private void initUnPartitionedTable() throws Exception {
createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, Collections.emptyList(), metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME); createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, Collections.emptyList(), metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME);
} }
@ -91,7 +99,6 @@ public class TestTriggerHiveMetaStoreEvent {
createPartition(table, Lists.newArrayList("2018", "march")); createPartition(table, Lists.newArrayList("2018", "march"));
} }
@DisabledOnOs(WINDOWS)
@Test @Test
public void testInsertOnUnPartitionedTable() throws Exception { public void testInsertOnUnPartitionedTable() throws Exception {
initUnPartitionedTable(); initUnPartitionedTable();
@ -126,7 +133,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
} }
@DisabledOnOs(WINDOWS)
@Test @Test
public void testInsertOnPartitionedTable() throws Exception { public void testInsertOnPartitionedTable() throws Exception {
initPartitionedTable(); initPartitionedTable();
@ -162,7 +168,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
} }
@DisabledOnOs(WINDOWS)
@Test @Test
public void testAddPartition() throws Exception { public void testAddPartition() throws Exception {
initPartitionedTable(); initPartitionedTable();
@ -204,7 +209,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertDoesNotThrow(() -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); assertDoesNotThrow(() -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june")));
} }
@DisabledOnOs(WINDOWS)
@Test @Test
public void testDropPartition() throws Exception { public void testDropPartition() throws Exception {
initPartitionedTable(); initPartitionedTable();
@ -246,7 +250,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertThrows(NoSuchObjectException.class, () -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); assertThrows(NoSuchObjectException.class, () -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june")));
} }
@DisabledOnOs(WINDOWS)
@Test @Test
public void testUnknownEventType() throws Exception { public void testUnknownEventType() throws Exception {
initUnPartitionedTable(); initUnPartitionedTable();

View File

@ -22,7 +22,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table; import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record; import org.apache.iceberg.data.Record;
@ -38,6 +37,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.DisabledOnOs;
@ -60,18 +60,23 @@ import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateN
import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
import static org.junit.jupiter.api.condition.OS.WINDOWS; import static org.junit.jupiter.api.condition.OS.WINDOWS;
@DisabledOnOs(WINDOWS)
public class TestPutIcebergWithHiveCatalog { public class TestPutIcebergWithHiveCatalog {
private TestRunner runner; private TestRunner runner;
private PutIceberg processor; private PutIceberg processor;
private Schema inputSchema; private Schema inputSchema;
private TestHiveCatalogService catalogService;
@RegisterExtension @RegisterExtension
public ThriftMetastore metastore = new ThriftMetastore(); public static ThriftMetastore metastore = new ThriftMetastore();
private static final Namespace NAMESPACE = Namespace.of("test_metastore"); private static final String CATALOG_NAME = "test_metastore";
private static final String TABLE_NAME = "users";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "users"); private static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME);
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME);
private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema( private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(1, "id", Types.IntegerType.get()),
@ -85,6 +90,16 @@ public class TestPutIcebergWithHiveCatalog {
inputSchema = new Schema.Parser().parse(avroSchema); inputSchema = new Schema.Parser().parse(avroSchema);
processor = new PutIceberg(); processor = new PutIceberg();
catalogService = new TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
.withWarehouseLocation(metastore.getWarehouseLocation())
.build();
}
@AfterEach
public void tearDown() {
catalogService.getCatalog().dropTable(TABLE_IDENTIFIER);
} }
private void initRecordReader() throws InitializationException { private void initRecordReader() throws InitializationException {
@ -106,28 +121,19 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
} }
private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException { private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException {
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
.withWarehouseLocation(metastore.getWarehouseLocation())
.build();
Catalog catalog = catalogService.getCatalog();
Map<String, String> tableProperties = new HashMap<>(); Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2"); tableProperties.put(TableProperties.FORMAT_VERSION, "2");
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties);
runner.addControllerService("catalog-service", catalogService); runner.addControllerService("catalog-service", catalogService);
runner.enableControllerService(catalogService); runner.enableControllerService(catalogService);
runner.setProperty(PutIceberg.CATALOG, "catalog-service"); runner.setProperty(PutIceberg.CATALOG, "catalog-service");
return catalog;
} }
@DisabledOnOs(WINDOWS)
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"avro"}) @ValueSource(strings = {"avro"})
public void onTriggerPartitioned(String fileFormat) throws Exception { public void onTriggerPartitioned(String fileFormat) throws Exception {
@ -137,14 +143,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
initRecordReader(); initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat); initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, "users"); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(); runner.run();
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance") .add(0, "John", "Finance")
@ -165,7 +171,6 @@ public class TestPutIcebergWithHiveCatalog {
"department_bucket=0", "department_bucket=1", "department_bucket=2")); "department_bucket=0", "department_bucket=1", "department_bucket=2"));
} }
@DisabledOnOs(WINDOWS)
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"orc"}) @ValueSource(strings = {"orc"})
public void onTriggerIdentityPartitioned(String fileFormat) throws Exception { public void onTriggerIdentityPartitioned(String fileFormat) throws Exception {
@ -175,14 +180,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
initRecordReader(); initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat); initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, "users"); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(); runner.run();
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance") .add(0, "John", "Finance")
@ -203,7 +208,6 @@ public class TestPutIcebergWithHiveCatalog {
"department=Finance", "department=Marketing", "department=Sales")); "department=Finance", "department=Marketing", "department=Sales"));
} }
@DisabledOnOs(WINDOWS)
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"parquet"}) @ValueSource(strings = {"parquet"})
public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception { public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception {
@ -214,14 +218,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
initRecordReader(); initRecordReader();
Catalog catalog = initCatalog(spec, fileFormat); initCatalog(spec, fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, "users"); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(); runner.run();
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance") .add(0, "John", "Finance")
@ -246,20 +250,19 @@ public class TestPutIcebergWithHiveCatalog {
)); ));
} }
@DisabledOnOs(WINDOWS)
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"avro"}) @ValueSource(strings = {"avro"})
public void onTriggerUnPartitioned(String fileFormat) throws Exception { public void onTriggerUnPartitioned(String fileFormat) throws Exception {
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
initRecordReader(); initRecordReader();
Catalog catalog = initCatalog(PartitionSpec.unpartitioned(), fileFormat); initCatalog(PartitionSpec.unpartitioned(), fileFormat);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, "users"); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]); runner.enqueue(new byte[0]);
runner.run(); runner.run();
Table table = catalog.loadTable(TABLE_IDENTIFIER); Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) List<Record> expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance") .add(0, "John", "Finance")