diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/pom.xml new file mode 100644 index 0000000000..07fd70b9ad --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/pom.xml @@ -0,0 +1,130 @@ + + + + + nifi-hive-bundle + org.apache.nifi + 1.20.0-SNAPSHOT + + 4.0.0 + + nifi-hive-test-utils + jar + + + + org.apache.hive + hive-metastore + ${hive.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-web + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.orc + orc-core + + + org.apache.hbase + hbase-client + + + co.cask.tephra + tephra-api + + + co.cask.tephra + tephra-core + + + co.cask.tephra + tephra-hbase-compat-1.0 + + + org.apache.parquet + parquet-hadoop-bundle + + + com.tdunning + json + + + com.zaxxer + HikariCP + + + com.google.guava + guava + + + + + org.apache.hive.hcatalog + hive-hcatalog-server-extensions + ${hive.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + org.apache.logging.log4j + log4j-core + + + + + org.junit.jupiter + junit-jupiter-api + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java similarity index 78% rename from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java rename to nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java index 0f9247a19b..08922abfc6 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/MetastoreCore.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.iceberg.metastore; +package org.apache.nifi.hive.metastore; import org.apache.derby.jdbc.EmbeddedDriver; import org.apache.hadoop.conf.Configuration; @@ -38,17 +38,18 @@ import org.apache.thrift.transport.TTransportFactory; import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.Reader; import java.lang.reflect.InvocationTargetException; -import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,35 +67,36 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.SCHEM import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE; -/** This class wraps Metastore service core functionalities. */ +/** + * This class wraps Metastore service core functionalities. + */ class MetastoreCore { - private final String DB_NAME = "test_metastore"; + private final String DATABASE_NAME = "test_metastore"; private String thriftConnectionUri; private Configuration hiveConf; - private HiveMetaStoreClient metastoreClient; + private HiveMetaStoreClient metaStoreClient; private File tempDir; private ExecutorService thriftServer; private TServer server; - public void initialize() throws IOException, TException, InvocationTargetException, NoSuchMethodException, + public void initialize(Map configOverrides) throws IOException, TException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException, SQLException { thriftServer = Executors.newSingleThreadExecutor(); tempDir = createTempDirectory("metastore").toFile(); setDerbyLogPath(); setupDB("jdbc:derby:" + getDerbyPath() + ";create=true"); - server = thriftServer(); + server = thriftServer(configOverrides); thriftServer.submit(() -> server.serve()); - metastoreClient = new HiveMetaStoreClient(hiveConf); - metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>())); + metaStoreClient = new HiveMetaStoreClient(hiveConf); + metaStoreClient.createDatabase(new Database(DATABASE_NAME, "description", getDBPath(), new HashMap<>())); } public void shutdown() { - - metastoreClient.close(); + metaStoreClient.close(); if (server != null) { server.stop(); @@ -107,7 +109,7 @@ class MetastoreCore { } } - private HiveConf hiveConf(int port) { + private HiveConf hiveConf(int port, Map configOverrides) throws IOException { thriftConnectionUri = "thrift://localhost:" + port; final HiveConf hiveConf = new HiveConf(new Configuration(), this.getClass()); @@ -124,10 +126,12 @@ class MetastoreCore { hiveConf.set(HIVE_SUPPORT_CONCURRENCY.getVarname(), "true"); hiveConf.setBoolean("hcatalog.hive.client.cache.disabled", true); - hiveConf.set(CONNECTION_POOLING_TYPE.getVarname(), "NONE"); hiveConf.set(HMS_HANDLER_FORCE_RELOAD_CONF.getVarname(), "true"); + configOverrides.forEach(hiveConf::set); + + writeHiveConfFile(hiveConf); return hiveConf; } @@ -140,10 +144,10 @@ class MetastoreCore { return new File(tempDir, "metastore_db").getPath(); } - private TServer thriftServer() throws TTransportException, MetaException, InvocationTargetException, - NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + private TServer thriftServer(Map configOverrides) throws TTransportException, MetaException, InvocationTargetException, + NoSuchMethodException, IllegalAccessException, NoSuchFieldException, IOException { final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new TServerSocket(0)); - hiveConf = hiveConf(socket.getServerSocket().getLocalPort()); + hiveConf = hiveConf(socket.getServerSocket().getLocalPort(), configOverrides); final HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", hiveConf); final IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler, true); final TTransportFactory transportFactory = new TTransportFactory(); @@ -161,15 +165,20 @@ class MetastoreCore { private void setupDB(String dbURL) throws SQLException, IOException { final Connection connection = DriverManager.getConnection(dbURL); - ScriptRunner scriptRunner = new ScriptRunner(connection); + final ScriptRunner scriptRunner = new ScriptRunner(connection); - final URL initScript = getClass().getClassLoader().getResource("hive-schema-4.0.0-alpha-2.derby.sql"); - final Reader reader = new BufferedReader(new FileReader(initScript.getFile())); + final InputStream inputStream = getClass().getClassLoader().getResourceAsStream("hive-schema-4.0.0-alpha-2.derby.sql"); + final Reader reader = new BufferedReader(new InputStreamReader(inputStream)); scriptRunner.runScript(reader); } private String getDBPath() { - return Paths.get(tempDir.getAbsolutePath(), DB_NAME + ".db").toAbsolutePath().toString(); + return Paths.get(tempDir.getAbsolutePath(), DATABASE_NAME + ".db").toAbsolutePath().toString(); + } + + private void writeHiveConfFile(HiveConf hiveConf) throws IOException { + File file = new File(tempDir.toPath() + "/hive-site.xml"); + hiveConf.writeXml(Files.newOutputStream(file.toPath())); } public String getThriftConnectionUri() { @@ -180,5 +189,17 @@ class MetastoreCore { return tempDir.getAbsolutePath(); } + public HiveMetaStoreClient getMetaStoreClient() { + return metaStoreClient; + } + + public Configuration getConfiguration() { + return hiveConf; + } + + public String getConfigurationLocation() { + return tempDir.toPath() + "/hive-site.xml"; + } + } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java similarity index 98% rename from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java rename to nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java index d1e7e1808e..4e4c65d043 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ScriptRunner.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.iceberg.metastore; +package org.apache.nifi.hive.metastore; import java.io.IOException; import java.io.LineNumberReader; diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java similarity index 68% rename from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java rename to nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java index 910c2df176..5e1425d667 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/metastore/ThriftMetastore.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java @@ -15,24 +15,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.iceberg.metastore; +package org.apache.nifi.hive.metastore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import java.util.HashMap; +import java.util.Map; + /** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */ public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback { private final MetastoreCore metastoreCore; + private Map configOverrides = new HashMap<>(); + public ThriftMetastore() { metastoreCore = new MetastoreCore(); } + public ThriftMetastore withConfigOverrides(Map configs) { + configOverrides = configs; + return this; + } + @Override public void beforeEach(ExtensionContext context) throws Exception { - metastoreCore.initialize(); + metastoreCore.initialize(configOverrides); } @Override @@ -47,4 +59,17 @@ public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback { public String getWarehouseLocation() { return metastoreCore.getWarehouseLocation(); } + + public HiveMetaStoreClient getMetaStoreClient() { + return metastoreCore.getMetaStoreClient(); + } + + public Configuration getConfiguration() { + return metastoreCore.getConfiguration(); + } + + public String getConfigurationLocation() { + return metastoreCore.getConfigurationLocation(); + } + } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-4.0.0-alpha-2.derby.sql b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/resources/hive-schema-4.0.0-alpha-2.derby.sql similarity index 90% rename from nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-4.0.0-alpha-2.derby.sql rename to nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/resources/hive-schema-4.0.0-alpha-2.derby.sql index b6beb25324..c1cc235a6f 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/hive-schema-4.0.0-alpha-2.derby.sql +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/resources/hive-schema-4.0.0-alpha-2.derby.sql @@ -770,22 +770,6 @@ CREATE TABLE "APP"."SCHEMA_VERSION" ( CREATE UNIQUE INDEX "APP"."UNIQUE_SCHEMA_VERSION" ON "APP"."SCHEMA_VERSION" ("SCHEMA_ID", "VERSION"); -CREATE TABLE REPL_TXN_MAP ( - RTM_REPL_POLICY varchar(256) NOT NULL, - RTM_SRC_TXN_ID bigint NOT NULL, - RTM_TARGET_TXN_ID bigint NOT NULL, - PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) -); - -CREATE TABLE "APP"."RUNTIME_STATS" ( - "RS_ID" bigint primary key, - "CREATE_TIME" integer not null, - "WEIGHT" integer not null, - "PAYLOAD" BLOB -); - -CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); - CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( WNL_ID bigint NOT NULL, WNL_TXNID bigint NOT NULL, @@ -801,88 +785,6 @@ CREATE TABLE TXN_WRITE_NOTIFICATION_LOG ( ); INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1); -CREATE TABLE "APP"."SCHEDULED_QUERIES" ( - "SCHEDULED_QUERY_ID" bigint primary key not null, - "SCHEDULE_NAME" varchar(256) not null, - "ENABLED" CHAR(1) NOT NULL DEFAULT 'N', - "CLUSTER_NAMESPACE" varchar(256) not null, - "USER" varchar(128) not null, - "SCHEDULE" varchar(256) not null, - "QUERY" varchar(4000) not null, - "NEXT_EXECUTION" integer, - "ACTIVE_EXECUTION_ID" bigint -); - -CREATE INDEX NEXTEXECUTIONINDEX ON APP.SCHEDULED_QUERIES (ENABLED,CLUSTER_NAMESPACE,NEXT_EXECUTION); -CREATE UNIQUE INDEX UNIQUE_SCHEDULED_QUERIES_NAME ON APP.SCHEDULED_QUERIES (SCHEDULE_NAME,CLUSTER_NAMESPACE); - -CREATE TABLE APP.SCHEDULED_EXECUTIONS ( - SCHEDULED_EXECUTION_ID bigint primary key not null, - EXECUTOR_QUERY_ID VARCHAR(256), - SCHEDULED_QUERY_ID bigint not null, - START_TIME integer not null, - END_TIME INTEGER, - LAST_UPDATE_TIME INTEGER, - ERROR_MESSAGE VARCHAR(2000), - STATE VARCHAR(256), - CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY (SCHEDULED_QUERY_ID) REFERENCES APP.SCHEDULED_QUERIES(SCHEDULED_QUERY_ID) ON DELETE CASCADE -); - -CREATE INDEX LASTUPDATETIMEINDEX ON APP.SCHEDULED_EXECUTIONS (LAST_UPDATE_TIME); -CREATE INDEX SCHEDULED_EXECUTIONS_SCHQID ON APP.SCHEDULED_EXECUTIONS (SCHEDULED_QUERY_ID); -CREATE UNIQUE INDEX SCHEDULED_EXECUTIONS_UNIQUE_ID ON APP.SCHEDULED_EXECUTIONS (SCHEDULED_EXECUTION_ID); - ---HIVE-23516 -CREATE TABLE "APP"."REPLICATION_METRICS" ( - "RM_SCHEDULED_EXECUTION_ID" bigint NOT NULL, - "RM_POLICY" varchar(256) NOT NULL, - "RM_DUMP_EXECUTION_ID" bigint NOT NULL, - "RM_METADATA" varchar(4000), - "RM_PROGRESS" varchar(10000), - "RM_START_TIME" integer not null, - "MESSAGE_FORMAT" VARCHAR(16) DEFAULT 'json-0.2', - PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID") -); - -CREATE INDEX "POLICY_IDX" ON "APP"."REPLICATION_METRICS" ("RM_POLICY"); -CREATE INDEX "DUMP_IDX" ON "APP"."REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID"); - --- Create stored procedure tables -CREATE TABLE "APP"."STORED_PROCS" ( - "SP_ID" BIGINT NOT NULL, - "CREATE_TIME" INTEGER NOT NULL, - "DB_ID" BIGINT NOT NULL, - "NAME" VARCHAR(256) NOT NULL, - "OWNER_NAME" VARCHAR(128) NOT NULL, - "SOURCE" clob NOT NULL, - PRIMARY KEY ("SP_ID") -); - -CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID"); -ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID"); - -CREATE TABLE "APP"."DATACONNECTORS" ("NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(32) NOT NULL, "URL" VARCHAR(4000) NOT NULL, "COMMENT" VARCHAR(256), "OWNER_NAME" VARCHAR(256), "OWNER_TYPE" VARCHAR(10), "CREATE_TIME" INTEGER NOT NULL); -CREATE TABLE "APP"."DATACONNECTOR_PARAMS" ("NAME" VARCHAR(128) NOT NULL, "PARAM_KEY" VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000)); -ALTER TABLE "APP"."DATACONNECTORS" ADD CONSTRAINT "DATACONNECTORS_KEY_PK" PRIMARY KEY ("NAME"); -ALTER TABLE "APP"."DATACONNECTOR_PARAMS" ADD CONSTRAINT "DATACONNECTOR_PARAMS_KEY_PK" PRIMARY KEY ("NAME", "PARAM_KEY"); -ALTER TABLE "APP"."DATACONNECTOR_PARAMS" ADD CONSTRAINT "NAME_FK1" FOREIGN KEY ("NAME") REFERENCES "APP"."DATACONNECTORS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION; -ALTER TABLE "APP"."DC_PRIVS" ADD CONSTRAINT "DC_PRIVS_FK1" FOREIGN KEY ("NAME") REFERENCES "APP"."DATACONNECTORS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION; - --- Create stored procedure packages -CREATE TABLE "APP"."PACKAGES" ( - "PKG_ID" BIGINT NOT NULL, - "CREATE_TIME" INTEGER NOT NULL, - "DB_ID" BIGINT NOT NULL, - "NAME" VARCHAR(256) NOT NULL, - "OWNER_NAME" VARCHAR(128) NOT NULL, - "HEADER" clob NOT NULL, - "BODY" clob NOT NULL, - PRIMARY KEY ("PKG_ID") -); - -CREATE UNIQUE INDEX "UNIQUEPKG" ON "PACKAGES" ("NAME", "DB_ID"); -ALTER TABLE "PACKAGES" ADD CONSTRAINT "PACKAGES_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID"); - -- ----------------------------------------------------------------- -- Record schema version. Should be the last step in the init script -- ----------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml index b75e610f37..8aa616f19b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml @@ -371,6 +371,12 @@ 1.20.0-SNAPSHOT test + + org.apache.nifi + nifi-hive-test-utils + 1.20.0-SNAPSHOT + test + diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent.java new file mode 100644 index 0000000000..89961a023e --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosUserService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.krb.KerberosLoginException; +import org.apache.nifi.security.krb.KerberosUser; +import org.apache.nifi.util.hive.AuthenticationFailedException; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.ValidationResources; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"hive", "metastore", "notification", "insert", "delete", "partition", "event"}) +@CapabilityDescription("The processor is capable to trigger different type of events in the HiveMetaStore and generate notifications. " + + "The metastore action to be executed is determined from the incoming path and event type attributes. " + + "The supported event type values are 'put' in case of data insertion or 'delete' in case of data removal. " + + "The notifications should be enabled in the metastore configuration to generate them e.g.: the 'hive.metastore.transactional.event.listeners' " + + "should have a proper listener configured, for instance 'org.apache.hive.hcatalog.listener.DbNotificationListener'.") +@WritesAttributes({ + @WritesAttribute(attribute = "metastore.notification.event", description = "The event type of the triggered notification.") +}) +public class TriggerHiveMetaStoreEvent extends AbstractProcessor { + + public static final String METASTORE_NOTIFICATION_EVENT = "metastore.notification.event"; + + private final HiveConfigurator hiveConfigurator = new HiveConfigurator(); + + private Configuration hiveConfig; + + private final AtomicReference kerberosUserReference = new AtomicReference<>(); + private volatile UserGroupInformation ugi; + + // Holder of cached Configuration information so validation does not reload the same config over and over + private final AtomicReference validationResourceHolder = new AtomicReference<>(); + + static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() + .name("hive-metastore-uri") + .displayName("Hive Metastore URI") + .description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. " + + "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources " + + "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.URI_LIST_VALIDATOR) + .build(); + + static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("hive-config-resources") + .displayName("Hive Configuration Resources") + .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor EVENT_TYPE = new PropertyDescriptor.Builder() + .name("event-type") + .displayName("Event Type") + .description("The type of the event. The acceptable values are 'put' in case of data insert or 'delete' in case of data removal.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${event.type}") + .build(); + + static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() + .name("path") + .displayName("Path") + .description("The path of the file or folder located in the file system.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${path}") + .build(); + + static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("catalog-name") + .displayName("Catalog Name") + .description("The name of the catalog.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("database-name") + .displayName("Database Name") + .description("The name of the database.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("table-name") + .displayName("Table Name") + .description("The name of the table.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-user-service") + .displayName("Kerberos User Service") + .description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.") + .identifiesControllerService(KerberosUserService.class) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after the data ingestion was successful.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + METASTORE_URI, + HIVE_CONFIGURATION_RESOURCES, + EVENT_TYPE, + PATH, + CATALOG_NAME, + DATABASE_NAME, + TABLE_NAME, + KERBEROS_USER_SERVICE + )); + + public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List problems = new ArrayList<>(); + final boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet(); + + if (confFileProvided) { + final boolean kerberosUserServiceIsSet = validationContext.getProperty(KERBEROS_USER_SERVICE).isSet(); + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + final Configuration config = hiveConfigurator.getConfigurationForValidation(validationResourceHolder, configFiles, getLogger()); + + final boolean securityEnabled = SecurityUtil.isSecurityEnabled(config); + if (securityEnabled && !kerberosUserServiceIsSet) { + problems.add(new ValidationResult.Builder() + .subject(KERBEROS_USER_SERVICE.getDisplayName()) + .valid(false) + .explanation("Security authentication is set to 'kerberos' in the configuration files but no KerberosUserService is configured.") + .build()); + } + } + + return problems; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); + + if (context.getProperty(METASTORE_URI).isSet()) { + hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue()); + } + + final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class); + + if (kerberosUserService != null) { + kerberosUserReference.set(kerberosUserService.createKerberosUser()); + try { + ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get()); + } catch (AuthenticationFailedException e) { + getLogger().error(e.getMessage(), e); + throw new ProcessException(e); + } + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final KerberosUser kerberosUser = kerberosUserReference.get(); + if (kerberosUser == null) { + doOnTrigger(context, session, flowFile); + } else { + try { + getUgi().doAs((PrivilegedExceptionAction) () -> { + doOnTrigger(context, session, flowFile); + return null; + }); + + } catch (Exception e) { + getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e); + session.transfer(flowFile, REL_FAILURE); + } + } + } + + public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException { + String catalogName; + if (context.getProperty(CATALOG_NAME).isSet()) { + catalogName = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue(); + } else { + catalogName = MetaStoreUtils.getDefaultCatalog(hiveConfig); + } + + final String eventType = context.getProperty(EVENT_TYPE).evaluateAttributeExpressions(flowFile).getValue(); + final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String path = context.getProperty(PATH).evaluateAttributeExpressions(flowFile).getValue(); + + try (final HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConfig)) { + final Table table = metaStoreClient.getTable(catalogName, databaseName, tableName); + final boolean isPartitioned = !table.getPartitionKeys().isEmpty(); + + switch (eventType.toLowerCase().trim()) { + case "put": + if (isPartitioned) { + EventMessage.EventType eventMessageType = handlePartitionedInsert(metaStoreClient, catalogName, databaseName, tableName, path); + flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, eventMessageType.toString()); + } else { + handleFileInsert(metaStoreClient, catalogName, databaseName, tableName, path, null); + flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, EventMessage.EventType.INSERT.toString()); + } + break; + case "delete": + if (isPartitioned) { + handleDropPartition(metaStoreClient, catalogName, databaseName, tableName, path); + flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, EventMessage.EventType.DROP_PARTITION.toString()); + } else { + getLogger().warn("The target table '{}' is not partitioned. No metastore action was executed.", tableName); + } + break; + default: + getLogger().error("Unknown event type '{}'", eventType); + session.transfer(flowFile, REL_FAILURE); + return; + } + } catch (Exception e) { + getLogger().error("Error occurred while metastore event processing", e); + session.transfer(flowFile, REL_FAILURE); + return; + } + + session.transfer(flowFile, REL_SUCCESS); + } + + /** + * Handles an insert event for partitioned table. If the partition already exists then an insert event will be executed in the metastore else a new partition will be added to the table. + * + * @param catalogName name of the catalog + * @param databaseName name of the database + * @param tableName name of the table + * @param path path for the file or directory + * @return type of the executed event + * @throws TException thrown if the metastore action fails + */ + private EventMessage.EventType handlePartitionedInsert(HiveMetaStoreClient metaStoreClient, String catalogName, + String databaseName, String tableName, String path) throws TException { + final List partitionValues = getPartitionValuesFromPath(path); + + try { + // Check if the partition already exists for the file to be inserted + metaStoreClient.getPartition(databaseName, tableName, partitionValues); + getLogger().debug("Creating file insert for partition with values {}", partitionValues); + handleFileInsert(metaStoreClient, catalogName, databaseName, tableName, path, partitionValues); + return EventMessage.EventType.INSERT; + } catch (Exception e) { + if (e instanceof NoSuchObjectException) { + getLogger().debug("Partition with values {} does not exists. Trying to append new partition", partitionValues, e); + try { + metaStoreClient.appendPartition(catalogName, databaseName, tableName, partitionValues); + return EventMessage.EventType.ADD_PARTITION; + } catch (TException ex) { + throw new TException("Failed to append partition with values " + partitionValues, ex); + } + } + throw new TException("Error occurred during partitioned file insertion with values " + partitionValues, e); + } + } + + /** + * Parse the partition values from the file or directory path. + * e.g.: hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/file -> partition values are [5,10] + * + * @param path path for the file or directory + * @return partition values + */ + private List getPartitionValuesFromPath(String path) { + final String[] pathParts = path.split("/"); + List partitionValues = new ArrayList<>(); + for (String pathPart : pathParts) { + if (pathPart.contains("=")) { + final String[] partitionParts = pathPart.split("="); + partitionValues.add(partitionParts[1]); + } + } + getLogger().debug("The following partition values were processed from path '{}': {}", path, partitionValues); + return partitionValues; + } + + /** + * Constructs a file insert event and send it into the metastore. + * + * @param catalogName name of the catalog + * @param databaseName name of the database + * @param tableName name of the table + * @param path path for the file or directory + * @param partitionValues partition values + * @throws IOException thrown if there are any error with the checksum generation + * @throws TException thrown if the metastore action fails + */ + private void handleFileInsert(HiveMetaStoreClient metaStoreClient, String catalogName, String databaseName, + String tableName, String path, List partitionValues) throws IOException, TException { + final InsertEventRequestData insertEventRequestData = new InsertEventRequestData(); + insertEventRequestData.setReplace(false); + insertEventRequestData.addToFilesAdded(path); + insertEventRequestData.addToFilesAddedChecksum(checksumFor(path)); + + final FireEventRequestData fireEventRequestData = new FireEventRequestData(); + fireEventRequestData.setInsertData(insertEventRequestData); + + final FireEventRequest fireEventRequest = new FireEventRequest(true, fireEventRequestData); + fireEventRequest.setCatName(catalogName); + fireEventRequest.setDbName(databaseName); + fireEventRequest.setTableName(tableName); + if (partitionValues != null) { + fireEventRequest.setPartitionVals(partitionValues); + } + + metaStoreClient.fireListenerEvent(fireEventRequest); + } + + /** + * Triggers a drop partition event in the metastore with the partition values parsed from the provided path. + * + * @param catalogName name of the catalog + * @param databaseName name of the database + * @param tableName name of the table + * @param path path for the file or directory + * @throws TException thrown if the metastore action fails + */ + private void handleDropPartition(HiveMetaStoreClient metaStoreClient, String catalogName, String databaseName, + String tableName, String path) throws TException { + final List partitionValues = getPartitionValuesFromPath(path); + try { + metaStoreClient.dropPartition(catalogName, databaseName, tableName, partitionValues, true); + } catch (TException e) { + if (e instanceof NoSuchObjectException) { + getLogger().error("Failed to drop partition. Partition with values {} does not exists.", partitionValues, e); + } + throw new TException(e); + } + } + + /** + * Generates checksum for file on the given path if the FileSystem supports it. + * + * @param filePath path for the file + * @return checksum for the file. + * @throws IOException thrown if there are any issue getting the FileSystem + */ + private String checksumFor(String filePath) throws IOException { + final Path path = new Path(filePath); + final FileSystem fileSystem = path.getFileSystem(hiveConfig); + final FileChecksum checksum = fileSystem.getFileChecksum(path); + if (checksum != null) { + return StringUtils.byteToHexString(checksum.getBytes(), 0, checksum.getLength()); + } + return ""; + } + + /** + * Checks if the user TGT expired and performs a re-login if needed. + * + * @return ugi + */ + private UserGroupInformation getUgi() { + KerberosUser kerberosUser = kerberosUserReference.get(); + try { + kerberosUser.checkTGTAndRelogin(); + } catch (KerberosLoginException e) { + throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + return ugi; + } +} diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index 38329449de..38b553378a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -31,7 +31,6 @@ import org.apache.nifi.security.krb.KerberosUser; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; /** @@ -42,7 +41,12 @@ public class HiveConfigurator { public Collection validate(String configFiles, String principal, String keyTab, String password, AtomicReference validationResourceHolder, ComponentLog log) { - final List problems = new ArrayList<>(); + final Configuration hiveConfig = getConfigurationForValidation(validationResourceHolder, configFiles, log); + + return new ArrayList<>(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log)); + } + + public Configuration getConfigurationForValidation(AtomicReference validationResourceHolder, String configFiles, ComponentLog log) { ValidationResources resources = validationResourceHolder.get(); // if no resources in the holder, or if the holder has different resources loaded, @@ -53,11 +57,7 @@ public class HiveConfigurator { validationResourceHolder.set(resources); } - final Configuration hiveConfig = resources.getConfiguration(); - - problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log)); - - return problems; + return resources.getConfiguration(); } public HiveConf getConfigurationFromFiles(final String configFiles) { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 150d1f351d..9087c4e24d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -17,3 +17,4 @@ org.apache.nifi.processors.hive.PutHive3QL org.apache.nifi.processors.hive.PutHive3Streaming org.apache.nifi.processors.orc.PutORC org.apache.nifi.processors.hive.UpdateHive3Table +org.apache.nifi.processors.hive.TriggerHiveMetaStoreEvent \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/docs/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent/additionalDetails.html b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/docs/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent/additionalDetails.html new file mode 100644 index 0000000000..72aff521c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/resources/docs/org/apache/nifi/processors/hive/TriggerHiveMetaStoreEvent/additionalDetails.html @@ -0,0 +1,52 @@ + + + + + + + TriggerHiveMetaStoreEvent + + + + + +

TriggerHiveMetaStoreEvent

+ +

Event Types

+ +

+ The processor supports the following event types: +

+
Put
+
+ The put event type represents a file or directory insertion. The metastore action depends on the target table. If the table is partitioned + the processor will parse the target partition values from the path e.g.: if the inserted file's path is 'hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/file' + then the partition values are coming from the '/a=5/b=10/' path part -> [5,10]. If the parsed partition does not exist then an 'APPEND PARTITION' event is going to be called + and the new partition will be registered in the metastore. If the partition exists only an 'INSERT' event will be called. +
+
Delete
+
+ The delete event type represents a 'DROP PARTITION' metastore action. The processor will parse the target partition values from the path + e.g.: if the deleted directory's path is 'hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/' then the partition values are coming from the '/a=5/b=10/' path part -> [5,10]. + If the table is not partitioned then no metastore action will be executed. +
+
+ + Any other event type will cause an error and will be directed to the failure relation of the processor. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java new file mode 100644 index 0000000000..0cd52f801c --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.nifi.hive.metastore.ThriftMetastore; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS; +import static org.apache.nifi.processors.hive.TriggerHiveMetaStoreEvent.METASTORE_NOTIFICATION_EVENT; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.condition.OS.WINDOWS; + +public class TestTriggerHiveMetaStoreEvent { + + private TestRunner runner; + private TriggerHiveMetaStoreEvent processor; + private HiveMetaStoreClient metaStoreClient; + + private static final String TEST_DATABASE_NAME = "test_metastore"; + private static final String TEST_TABLE_NAME = "test_table"; + + private static final List PARTITION_COLUMNS = Arrays.asList( + new FieldSchema("year", "string", "year partition column"), + new FieldSchema("month", "string", "month partition column") + ); + + @RegisterExtension + public ThriftMetastore metastore = new ThriftMetastore() + .withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(), "org.apache.hive.hcatalog.listener.DbNotificationListener")); + + @BeforeEach + public void setUp() { + processor = new TriggerHiveMetaStoreEvent(); + metaStoreClient = metastore.getMetaStoreClient(); + } + + private void initUnPartitionedTable() throws Exception { + createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, Collections.emptyList(), metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME); + } + + private void initPartitionedTable() throws Exception { + Table table = createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, PARTITION_COLUMNS, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME); + createPartition(table, Lists.newArrayList("2017", "march")); + createPartition(table, Lists.newArrayList("2017", "april")); + createPartition(table, Lists.newArrayList("2018", "march")); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testInsertOnUnPartitionedTable() throws Exception { + initUnPartitionedTable(); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation()); + runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put"); + runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file"); + runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME); + runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME); + + runner.setValidateExpressionUsage(false); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0); + + Assertions.assertEquals(EventMessage.EventType.INSERT.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT)); + + CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId(); + NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null); + NotificationEvent event = response.getEvents().get(0); + + InsertMessage insertMessage = MessageFactory.getInstance().getDeserializer().getInsertMessage(event.getMessage()); + insertMessage.getFiles().forEach(s -> assertEquals(s, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file###")); + + assertEquals(event.getEventType(), EventMessage.EventType.INSERT.toString()); + assertEquals(event.getDbName(), TEST_DATABASE_NAME); + assertEquals(insertMessage.getDB(), TEST_DATABASE_NAME); + assertEquals(event.getTableName(), TEST_TABLE_NAME); + assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testInsertOnPartitionedTable() throws Exception { + initPartitionedTable(); + assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys()); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation()); + runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put"); + runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file"); + runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME); + runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME); + + runner.setValidateExpressionUsage(false); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0); + + Assertions.assertEquals(EventMessage.EventType.INSERT.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT)); + + CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId(); + NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null); + NotificationEvent event = response.getEvents().get(0); + + InsertMessage insertMessage = MessageFactory.getInstance().getDeserializer().getInsertMessage(event.getMessage()); + insertMessage.getFiles().forEach(s -> assertEquals(s, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file###")); + + assertEquals(event.getEventType(), EventMessage.EventType.INSERT.toString()); + assertEquals(event.getDbName(), TEST_DATABASE_NAME); + assertEquals(insertMessage.getDB(), TEST_DATABASE_NAME); + assertEquals(event.getTableName(), TEST_TABLE_NAME); + assertEquals(insertMessage.getTable(), TEST_TABLE_NAME); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testAddPartition() throws Exception { + initPartitionedTable(); + assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys()); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation()); + runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put"); + runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=june/test_file"); + runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME); + runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME); + + runner.setValidateExpressionUsage(false); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0); + + Assertions.assertEquals(EventMessage.EventType.ADD_PARTITION.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT)); + + CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId(); + NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null); + NotificationEvent event = response.getEvents().get(0); + + AddPartitionMessage addPartitionMessage = MessageFactory.getInstance().getDeserializer().getAddPartitionMessage(event.getMessage()); + Map expectedPartition = new HashMap() {{ + put("year", "2017"); + put("month", "june"); + }}; + + assertEquals(addPartitionMessage.getPartitions().get(0), expectedPartition); + assertEquals(event.getEventType(), EventMessage.EventType.ADD_PARTITION.toString()); + assertEquals(event.getDbName(), TEST_DATABASE_NAME); + assertEquals(addPartitionMessage.getDB(), TEST_DATABASE_NAME); + assertEquals(event.getTableName(), TEST_TABLE_NAME); + assertEquals(addPartitionMessage.getTable(), TEST_TABLE_NAME); + + assertDoesNotThrow(() -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testDropPartition() throws Exception { + initPartitionedTable(); + assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys()); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation()); + runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "delete"); + runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file"); + runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME); + runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME); + + runner.setValidateExpressionUsage(false); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0); + + Assertions.assertEquals(EventMessage.EventType.DROP_PARTITION.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT)); + + CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId(); + NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null); + NotificationEvent event = response.getEvents().get(0); + + DropPartitionMessage dropPartitionMessage = MessageFactory.getInstance().getDeserializer().getDropPartitionMessage(event.getMessage()); + Map expectedPartition = new HashMap() {{ + put("year", "2017"); + put("month", "march"); + }}; + + assertEquals(dropPartitionMessage.getPartitions().get(0), expectedPartition); + assertEquals(event.getEventType(), EventMessage.EventType.DROP_PARTITION.toString()); + assertEquals(event.getDbName(), TEST_DATABASE_NAME); + assertEquals(dropPartitionMessage.getDB(), TEST_DATABASE_NAME); + assertEquals(event.getTableName(), TEST_TABLE_NAME); + assertEquals(dropPartitionMessage.getTable(), TEST_TABLE_NAME); + + assertThrows(NoSuchObjectException.class, () -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june"))); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testUnknownEventType() throws Exception { + initUnPartitionedTable(); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation()); + runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "unknown"); + runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file"); + runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME); + runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME); + + runner.setValidateExpressionUsage(false); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_FAILURE, 1); + } + + private Table createTable(String databaseName, String tableName, List partitionColumns, String location) throws Exception { + return new TableBuilder() + .setDbName(databaseName) + .setTableName(tableName) + .addCol("id", "int", "id column") + .addCol("value", "string", "value column") + .setPartCols(partitionColumns) + .setType(TableType.EXTERNAL_TABLE.name()) + .setLocation(location) + .create(metaStoreClient, metastore.getConfiguration()); + } + + private void createPartition(Table table, List values) throws Exception { + new PartitionBuilder() + .inTable(table) + .setValues(values) + .addToTable(metaStoreClient, metastore.getConfiguration()); + } +} diff --git a/nifi-nar-bundles/nifi-hive-bundle/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/pom.xml index 80c88ba728..e2c48dc7ba 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/pom.xml @@ -35,6 +35,7 @@ nifi-hive_1_1-nar nifi-hive3-processors nifi-hive3-nar + nifi-hive-test-utils diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml index 89bb113e73..78ec2ef016 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml @@ -171,6 +171,12 @@ + + org.apache.nifi + nifi-hive-test-utils + 1.20.0-SNAPSHOT + test + org.apache.nifi nifi-avro-record-utils diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index 1f7a79258b..62071102e6 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -28,8 +28,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.hive.metastore.ThriftMetastore; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; -import org.apache.nifi.processors.iceberg.metastore.ThriftMetastore; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser;