From 44a7f7f38b3950a8a0135d1ce8602a4890c3d995 Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Thu, 13 Apr 2023 11:49:52 +0200 Subject: [PATCH] NIFI-11440 Speed up Iceberg Hive Metastore Tests This closes #7170 Signed-off-by: David Handermann --- .../nifi/hive/metastore/ThriftMetastore.java | 10 +-- .../hive/TestTriggerHiveMetaStoreEvent.java | 15 ++-- .../TestPutIcebergWithHiveCatalog.java | 71 ++++++++++--------- 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java index 5e1425d667..2b3dc4da5c 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java @@ -19,15 +19,15 @@ package org.apache.nifi.hive.metastore; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.ExtensionContext; import java.util.HashMap; import java.util.Map; /** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */ -public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback { +public class ThriftMetastore implements BeforeAllCallback, AfterAllCallback { private final MetastoreCore metastoreCore; @@ -43,12 +43,12 @@ public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback { } @Override - public void beforeEach(ExtensionContext context) throws Exception { + public void beforeAll(ExtensionContext context) throws Exception { metastoreCore.initialize(configOverrides); } @Override - public void afterEach(ExtensionContext context) { + public void afterAll(ExtensionContext context) { metastoreCore.shutdown(); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java index 0cd52f801c..72e312439f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java @@ -37,6 +37,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.condition.OS.WINDOWS; +@DisabledOnOs(WINDOWS) public class TestTriggerHiveMetaStoreEvent { private TestRunner runner; @@ -71,7 +74,7 @@ public class TestTriggerHiveMetaStoreEvent { ); @RegisterExtension - public ThriftMetastore metastore = new ThriftMetastore() + public static ThriftMetastore metastore = new ThriftMetastore() .withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(), "org.apache.hive.hcatalog.listener.DbNotificationListener")); @BeforeEach @@ -80,6 +83,11 @@ public class TestTriggerHiveMetaStoreEvent { metaStoreClient = metastore.getMetaStoreClient(); } + @AfterEach + public void tearDown() throws TException { + metaStoreClient.dropTable(TEST_DATABASE_NAME, TEST_TABLE_NAME); + } + private void initUnPartitionedTable() throws Exception { createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, Collections.emptyList(), metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME); } @@ -91,7 +99,6 @@ public class TestTriggerHiveMetaStoreEvent { createPartition(table, Lists.newArrayList("2018", "march")); } - @DisabledOnOs(WINDOWS) @Test public void testInsertOnUnPartitionedTable() throws Exception { initUnPartitionedTable(); @@ -126,7 +133,6 @@ public class TestTriggerHiveMetaStoreEvent { assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); } - @DisabledOnOs(WINDOWS) @Test public void testInsertOnPartitionedTable() throws Exception { initPartitionedTable(); @@ -162,7 +168,6 @@ public class TestTriggerHiveMetaStoreEvent { assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); } - @DisabledOnOs(WINDOWS) @Test public void testAddPartition() throws Exception { initPartitionedTable(); @@ -204,7 +209,6 @@ public class TestTriggerHiveMetaStoreEvent { assertDoesNotThrow(() -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); } - @DisabledOnOs(WINDOWS) @Test public void testDropPartition() throws Exception { initPartitionedTable(); @@ -246,7 +250,6 @@ public class TestTriggerHiveMetaStoreEvent { assertThrows(NoSuchObjectException.class, () -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); } - @DisabledOnOs(WINDOWS) @Test public void testUnknownEventType() throws Exception { initUnPartitionedTable(); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index bd4f959c43..fb5f5ce41a 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -22,7 +22,6 @@ import org.apache.commons.io.IOUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; @@ -38,6 +37,7 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.condition.DisabledOnOs; @@ -60,18 +60,23 @@ import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateN import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; import static org.junit.jupiter.api.condition.OS.WINDOWS; +@DisabledOnOs(WINDOWS) public class TestPutIcebergWithHiveCatalog { private TestRunner runner; private PutIceberg processor; private Schema inputSchema; + private TestHiveCatalogService catalogService; @RegisterExtension - public ThriftMetastore metastore = new ThriftMetastore(); + public static ThriftMetastore metastore = new ThriftMetastore(); - private static final Namespace NAMESPACE = Namespace.of("test_metastore"); + private static final String CATALOG_NAME = "test_metastore"; + private static final String TABLE_NAME = "users"; - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "users"); + private static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME); + + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME); private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -85,6 +90,16 @@ public class TestPutIcebergWithHiveCatalog { inputSchema = new Schema.Parser().parse(avroSchema); processor = new PutIceberg(); + + catalogService = new TestHiveCatalogService.Builder() + .withMetastoreUri(metastore.getThriftConnectionUri()) + .withWarehouseLocation(metastore.getWarehouseLocation()) + .build(); + } + + @AfterEach + public void tearDown() { + catalogService.getCatalog().dropTable(TABLE_IDENTIFIER); } private void initRecordReader() throws InitializationException { @@ -106,28 +121,19 @@ public class TestPutIcebergWithHiveCatalog { runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); } - private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException { - TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() - .withMetastoreUri(metastore.getThriftConnectionUri()) - .withWarehouseLocation(metastore.getWarehouseLocation()) - .build(); - Catalog catalog = catalogService.getCatalog(); - + private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException { Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.FORMAT_VERSION, "2"); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat); - catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); + catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); runner.addControllerService("catalog-service", catalogService); runner.enableControllerService(catalogService); runner.setProperty(PutIceberg.CATALOG, "catalog-service"); - - return catalog; } - @DisabledOnOs(WINDOWS) @ParameterizedTest @ValueSource(strings = {"avro"}) public void onTriggerPartitioned(String fileFormat) throws Exception { @@ -137,14 +143,14 @@ public class TestPutIcebergWithHiveCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); - runner.setProperty(PutIceberg.TABLE_NAME, "users"); + initCatalog(spec, fileFormat); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -165,7 +171,6 @@ public class TestPutIcebergWithHiveCatalog { "department_bucket=0", "department_bucket=1", "department_bucket=2")); } - @DisabledOnOs(WINDOWS) @ParameterizedTest @ValueSource(strings = {"orc"}) public void onTriggerIdentityPartitioned(String fileFormat) throws Exception { @@ -175,14 +180,14 @@ public class TestPutIcebergWithHiveCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); - runner.setProperty(PutIceberg.TABLE_NAME, "users"); + initCatalog(spec, fileFormat); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -203,7 +208,6 @@ public class TestPutIcebergWithHiveCatalog { "department=Finance", "department=Marketing", "department=Sales")); } - @DisabledOnOs(WINDOWS) @ParameterizedTest @ValueSource(strings = {"parquet"}) public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception { @@ -214,14 +218,14 @@ public class TestPutIcebergWithHiveCatalog { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(spec, fileFormat); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); - runner.setProperty(PutIceberg.TABLE_NAME, "users"); + initCatalog(spec, fileFormat); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") @@ -246,20 +250,19 @@ public class TestPutIcebergWithHiveCatalog { )); } - @DisabledOnOs(WINDOWS) @ParameterizedTest @ValueSource(strings = {"avro"}) public void onTriggerUnPartitioned(String fileFormat) throws Exception { runner = TestRunners.newTestRunner(processor); initRecordReader(); - Catalog catalog = initCatalog(PartitionSpec.unpartitioned(), fileFormat); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore"); - runner.setProperty(PutIceberg.TABLE_NAME, "users"); + initCatalog(PartitionSpec.unpartitioned(), fileFormat); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER); List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance")