NIFI-11110: Create processor for triggering HMS events

Review comment fixes

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6902
This commit is contained in:
Mark Bathori 2023-01-29 20:31:11 +01:00 committed by Matthew Burgess
parent 653631cc67
commit 869e2b3d66
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
14 changed files with 1022 additions and 130 deletions

View File

@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-hive-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-hive-test-utils</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-api</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-hbase-compat-1.0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-server-extensions</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg.metastore;
package org.apache.nifi.hive.metastore;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.apache.hadoop.conf.Configuration;
@ -38,17 +38,18 @@ import org.apache.thrift.transport.TTransportFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -66,35 +67,36 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.SCHEM
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.THRIFT_URIS;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.WAREHOUSE;
/** This class wraps Metastore service core functionalities. */
/**
* This class wraps Metastore service core functionalities.
*/
class MetastoreCore {
private final String DB_NAME = "test_metastore";
private final String DATABASE_NAME = "test_metastore";
private String thriftConnectionUri;
private Configuration hiveConf;
private HiveMetaStoreClient metastoreClient;
private HiveMetaStoreClient metaStoreClient;
private File tempDir;
private ExecutorService thriftServer;
private TServer server;
public void initialize() throws IOException, TException, InvocationTargetException, NoSuchMethodException,
public void initialize(Map<String, String> configOverrides) throws IOException, TException, InvocationTargetException, NoSuchMethodException,
IllegalAccessException, NoSuchFieldException, SQLException {
thriftServer = Executors.newSingleThreadExecutor();
tempDir = createTempDirectory("metastore").toFile();
setDerbyLogPath();
setupDB("jdbc:derby:" + getDerbyPath() + ";create=true");
server = thriftServer();
server = thriftServer(configOverrides);
thriftServer.submit(() -> server.serve());
metastoreClient = new HiveMetaStoreClient(hiveConf);
metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>()));
metaStoreClient = new HiveMetaStoreClient(hiveConf);
metaStoreClient.createDatabase(new Database(DATABASE_NAME, "description", getDBPath(), new HashMap<>()));
}
public void shutdown() {
metastoreClient.close();
metaStoreClient.close();
if (server != null) {
server.stop();
@ -107,7 +109,7 @@ class MetastoreCore {
}
}
private HiveConf hiveConf(int port) {
private HiveConf hiveConf(int port, Map<String, String> configOverrides) throws IOException {
thriftConnectionUri = "thrift://localhost:" + port;
final HiveConf hiveConf = new HiveConf(new Configuration(), this.getClass());
@ -124,10 +126,12 @@ class MetastoreCore {
hiveConf.set(HIVE_SUPPORT_CONCURRENCY.getVarname(), "true");
hiveConf.setBoolean("hcatalog.hive.client.cache.disabled", true);
hiveConf.set(CONNECTION_POOLING_TYPE.getVarname(), "NONE");
hiveConf.set(HMS_HANDLER_FORCE_RELOAD_CONF.getVarname(), "true");
configOverrides.forEach(hiveConf::set);
writeHiveConfFile(hiveConf);
return hiveConf;
}
@ -140,10 +144,10 @@ class MetastoreCore {
return new File(tempDir, "metastore_db").getPath();
}
private TServer thriftServer() throws TTransportException, MetaException, InvocationTargetException,
NoSuchMethodException, IllegalAccessException, NoSuchFieldException {
private TServer thriftServer(Map<String, String> configOverrides) throws TTransportException, MetaException, InvocationTargetException,
NoSuchMethodException, IllegalAccessException, NoSuchFieldException, IOException {
final TServerSocketKeepAlive socket = new TServerSocketKeepAlive(new TServerSocket(0));
hiveConf = hiveConf(socket.getServerSocket().getLocalPort());
hiveConf = hiveConf(socket.getServerSocket().getLocalPort(), configOverrides);
final HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", hiveConf);
final IHMSHandler handler = RetryingHMSHandler.getProxy(hiveConf, baseHandler, true);
final TTransportFactory transportFactory = new TTransportFactory();
@ -161,15 +165,20 @@ class MetastoreCore {
private void setupDB(String dbURL) throws SQLException, IOException {
final Connection connection = DriverManager.getConnection(dbURL);
ScriptRunner scriptRunner = new ScriptRunner(connection);
final ScriptRunner scriptRunner = new ScriptRunner(connection);
final URL initScript = getClass().getClassLoader().getResource("hive-schema-4.0.0-alpha-2.derby.sql");
final Reader reader = new BufferedReader(new FileReader(initScript.getFile()));
final InputStream inputStream = getClass().getClassLoader().getResourceAsStream("hive-schema-4.0.0-alpha-2.derby.sql");
final Reader reader = new BufferedReader(new InputStreamReader(inputStream));
scriptRunner.runScript(reader);
}
private String getDBPath() {
return Paths.get(tempDir.getAbsolutePath(), DB_NAME + ".db").toAbsolutePath().toString();
return Paths.get(tempDir.getAbsolutePath(), DATABASE_NAME + ".db").toAbsolutePath().toString();
}
private void writeHiveConfFile(HiveConf hiveConf) throws IOException {
File file = new File(tempDir.toPath() + "/hive-site.xml");
hiveConf.writeXml(Files.newOutputStream(file.toPath()));
}
public String getThriftConnectionUri() {
@ -180,5 +189,17 @@ class MetastoreCore {
return tempDir.getAbsolutePath();
}
public HiveMetaStoreClient getMetaStoreClient() {
return metaStoreClient;
}
public Configuration getConfiguration() {
return hiveConf;
}
public String getConfigurationLocation() {
return tempDir.toPath() + "/hive-site.xml";
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg.metastore;
package org.apache.nifi.hive.metastore;
import java.io.IOException;
import java.io.LineNumberReader;

View File

@ -15,24 +15,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.iceberg.metastore;
package org.apache.nifi.hive.metastore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.HashMap;
import java.util.Map;
/** A JUnit Extension that creates a Hive Metastore Thrift service backed by a Hive Metastore using an in-memory Derby database. */
public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
private final MetastoreCore metastoreCore;
private Map<String, String> configOverrides = new HashMap<>();
public ThriftMetastore() {
metastoreCore = new MetastoreCore();
}
public ThriftMetastore withConfigOverrides(Map<String, String> configs) {
configOverrides = configs;
return this;
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
metastoreCore.initialize();
metastoreCore.initialize(configOverrides);
}
@Override
@ -47,4 +59,17 @@ public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
public String getWarehouseLocation() {
return metastoreCore.getWarehouseLocation();
}
public HiveMetaStoreClient getMetaStoreClient() {
return metastoreCore.getMetaStoreClient();
}
public Configuration getConfiguration() {
return metastoreCore.getConfiguration();
}
public String getConfigurationLocation() {
return metastoreCore.getConfigurationLocation();
}
}

View File

@ -770,22 +770,6 @@ CREATE TABLE "APP"."SCHEMA_VERSION" (
CREATE UNIQUE INDEX "APP"."UNIQUE_SCHEMA_VERSION" ON "APP"."SCHEMA_VERSION" ("SCHEMA_ID", "VERSION");
CREATE TABLE REPL_TXN_MAP (
RTM_REPL_POLICY varchar(256) NOT NULL,
RTM_SRC_TXN_ID bigint NOT NULL,
RTM_TARGET_TXN_ID bigint NOT NULL,
PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
);
CREATE TABLE "APP"."RUNTIME_STATS" (
"RS_ID" bigint primary key,
"CREATE_TIME" integer not null,
"WEIGHT" integer not null,
"PAYLOAD" BLOB
);
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
WNL_ID bigint NOT NULL,
WNL_TXNID bigint NOT NULL,
@ -801,88 +785,6 @@ CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
);
INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
CREATE TABLE "APP"."SCHEDULED_QUERIES" (
"SCHEDULED_QUERY_ID" bigint primary key not null,
"SCHEDULE_NAME" varchar(256) not null,
"ENABLED" CHAR(1) NOT NULL DEFAULT 'N',
"CLUSTER_NAMESPACE" varchar(256) not null,
"USER" varchar(128) not null,
"SCHEDULE" varchar(256) not null,
"QUERY" varchar(4000) not null,
"NEXT_EXECUTION" integer,
"ACTIVE_EXECUTION_ID" bigint
);
CREATE INDEX NEXTEXECUTIONINDEX ON APP.SCHEDULED_QUERIES (ENABLED,CLUSTER_NAMESPACE,NEXT_EXECUTION);
CREATE UNIQUE INDEX UNIQUE_SCHEDULED_QUERIES_NAME ON APP.SCHEDULED_QUERIES (SCHEDULE_NAME,CLUSTER_NAMESPACE);
CREATE TABLE APP.SCHEDULED_EXECUTIONS (
SCHEDULED_EXECUTION_ID bigint primary key not null,
EXECUTOR_QUERY_ID VARCHAR(256),
SCHEDULED_QUERY_ID bigint not null,
START_TIME integer not null,
END_TIME INTEGER,
LAST_UPDATE_TIME INTEGER,
ERROR_MESSAGE VARCHAR(2000),
STATE VARCHAR(256),
CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY (SCHEDULED_QUERY_ID) REFERENCES APP.SCHEDULED_QUERIES(SCHEDULED_QUERY_ID) ON DELETE CASCADE
);
CREATE INDEX LASTUPDATETIMEINDEX ON APP.SCHEDULED_EXECUTIONS (LAST_UPDATE_TIME);
CREATE INDEX SCHEDULED_EXECUTIONS_SCHQID ON APP.SCHEDULED_EXECUTIONS (SCHEDULED_QUERY_ID);
CREATE UNIQUE INDEX SCHEDULED_EXECUTIONS_UNIQUE_ID ON APP.SCHEDULED_EXECUTIONS (SCHEDULED_EXECUTION_ID);
--HIVE-23516
CREATE TABLE "APP"."REPLICATION_METRICS" (
"RM_SCHEDULED_EXECUTION_ID" bigint NOT NULL,
"RM_POLICY" varchar(256) NOT NULL,
"RM_DUMP_EXECUTION_ID" bigint NOT NULL,
"RM_METADATA" varchar(4000),
"RM_PROGRESS" varchar(10000),
"RM_START_TIME" integer not null,
"MESSAGE_FORMAT" VARCHAR(16) DEFAULT 'json-0.2',
PRIMARY KEY("RM_SCHEDULED_EXECUTION_ID")
);
CREATE INDEX "POLICY_IDX" ON "APP"."REPLICATION_METRICS" ("RM_POLICY");
CREATE INDEX "DUMP_IDX" ON "APP"."REPLICATION_METRICS" ("RM_DUMP_EXECUTION_ID");
-- Create stored procedure tables
CREATE TABLE "APP"."STORED_PROCS" (
"SP_ID" BIGINT NOT NULL,
"CREATE_TIME" INTEGER NOT NULL,
"DB_ID" BIGINT NOT NULL,
"NAME" VARCHAR(256) NOT NULL,
"OWNER_NAME" VARCHAR(128) NOT NULL,
"SOURCE" clob NOT NULL,
PRIMARY KEY ("SP_ID")
);
CREATE UNIQUE INDEX "UNIQUESTOREDPROC" ON "STORED_PROCS" ("NAME", "DB_ID");
ALTER TABLE "STORED_PROCS" ADD CONSTRAINT "STOREDPROC_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID");
CREATE TABLE "APP"."DATACONNECTORS" ("NAME" VARCHAR(128) NOT NULL, "TYPE" VARCHAR(32) NOT NULL, "URL" VARCHAR(4000) NOT NULL, "COMMENT" VARCHAR(256), "OWNER_NAME" VARCHAR(256), "OWNER_TYPE" VARCHAR(10), "CREATE_TIME" INTEGER NOT NULL);
CREATE TABLE "APP"."DATACONNECTOR_PARAMS" ("NAME" VARCHAR(128) NOT NULL, "PARAM_KEY" VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
ALTER TABLE "APP"."DATACONNECTORS" ADD CONSTRAINT "DATACONNECTORS_KEY_PK" PRIMARY KEY ("NAME");
ALTER TABLE "APP"."DATACONNECTOR_PARAMS" ADD CONSTRAINT "DATACONNECTOR_PARAMS_KEY_PK" PRIMARY KEY ("NAME", "PARAM_KEY");
ALTER TABLE "APP"."DATACONNECTOR_PARAMS" ADD CONSTRAINT "NAME_FK1" FOREIGN KEY ("NAME") REFERENCES "APP"."DATACONNECTORS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
ALTER TABLE "APP"."DC_PRIVS" ADD CONSTRAINT "DC_PRIVS_FK1" FOREIGN KEY ("NAME") REFERENCES "APP"."DATACONNECTORS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
-- Create stored procedure packages
CREATE TABLE "APP"."PACKAGES" (
"PKG_ID" BIGINT NOT NULL,
"CREATE_TIME" INTEGER NOT NULL,
"DB_ID" BIGINT NOT NULL,
"NAME" VARCHAR(256) NOT NULL,
"OWNER_NAME" VARCHAR(128) NOT NULL,
"HEADER" clob NOT NULL,
"BODY" clob NOT NULL,
PRIMARY KEY ("PKG_ID")
);
CREATE UNIQUE INDEX "UNIQUEPKG" ON "PACKAGES" ("NAME", "DB_ID");
ALTER TABLE "PACKAGES" ADD CONSTRAINT "PACKAGES_FK1" FOREIGN KEY ("DB_ID") REFERENCES "DBS" ("DB_ID");
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------

View File

@ -371,6 +371,12 @@
<version>1.20.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hive-test-utils</artifactId>
<version>1.20.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,462 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FireEventRequest;
import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.ValidationResources;
import org.apache.thrift.TException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"hive", "metastore", "notification", "insert", "delete", "partition", "event"})
@CapabilityDescription("The processor is capable to trigger different type of events in the HiveMetaStore and generate notifications. " +
"The metastore action to be executed is determined from the incoming path and event type attributes. " +
"The supported event type values are 'put' in case of data insertion or 'delete' in case of data removal. " +
"The notifications should be enabled in the metastore configuration to generate them e.g.: the 'hive.metastore.transactional.event.listeners' " +
"should have a proper listener configured, for instance 'org.apache.hive.hcatalog.listener.DbNotificationListener'.")
@WritesAttributes({
@WritesAttribute(attribute = "metastore.notification.event", description = "The event type of the triggered notification.")
})
public class TriggerHiveMetaStoreEvent extends AbstractProcessor {
public static final String METASTORE_NOTIFICATION_EVENT = "metastore.notification.event";
private final HiveConfigurator hiveConfigurator = new HiveConfigurator();
private Configuration hiveConfig;
private final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
private volatile UserGroupInformation ugi;
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
.name("hive-metastore-uri")
.displayName("Hive Metastore URI")
.description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. "
+ "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources "
+ "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URI_LIST_VALIDATOR)
.build();
static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("hive-config-resources")
.displayName("Hive Configuration Resources")
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor EVENT_TYPE = new PropertyDescriptor.Builder()
.name("event-type")
.displayName("Event Type")
.description("The type of the event. The acceptable values are 'put' in case of data insert or 'delete' in case of data removal.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("${event.type}")
.build();
static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
.name("path")
.displayName("Path")
.description("The path of the file or folder located in the file system.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("${path}")
.build();
static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder()
.name("catalog-name")
.displayName("Catalog Name")
.description("The name of the catalog.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("database-name")
.displayName("Database Name")
.description("The name of the database.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("table-name")
.displayName("Table Name")
.description("The name of the table.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.")
.identifiesControllerService(KerberosUserService.class)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after the data ingestion was successful.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if the data ingestion failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
METASTORE_URI,
HIVE_CONFIGURATION_RESOURCES,
EVENT_TYPE,
PATH,
CATALOG_NAME,
DATABASE_NAME,
TABLE_NAME,
KERBEROS_USER_SERVICE
));
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
final boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
if (confFileProvided) {
final boolean kerberosUserServiceIsSet = validationContext.getProperty(KERBEROS_USER_SERVICE).isSet();
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
final Configuration config = hiveConfigurator.getConfigurationForValidation(validationResourceHolder, configFiles, getLogger());
final boolean securityEnabled = SecurityUtil.isSecurityEnabled(config);
if (securityEnabled && !kerberosUserServiceIsSet) {
problems.add(new ValidationResult.Builder()
.subject(KERBEROS_USER_SERVICE.getDisplayName())
.valid(false)
.explanation("Security authentication is set to 'kerberos' in the configuration files but no KerberosUserService is configured.")
.build());
}
}
return problems;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
if (context.getProperty(METASTORE_URI).isSet()) {
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue());
}
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
kerberosUserReference.set(kerberosUserService.createKerberosUser());
try {
ugi = hiveConfigurator.authenticate(hiveConfig, kerberosUserReference.get());
} catch (AuthenticationFailedException e) {
getLogger().error(e.getMessage(), e);
throw new ProcessException(e);
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final KerberosUser kerberosUser = kerberosUserReference.get();
if (kerberosUser == null) {
doOnTrigger(context, session, flowFile);
} else {
try {
getUgi().doAs((PrivilegedExceptionAction<Void>) () -> {
doOnTrigger(context, session, flowFile);
return null;
});
} catch (Exception e) {
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
session.transfer(flowFile, REL_FAILURE);
}
}
}
public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException {
String catalogName;
if (context.getProperty(CATALOG_NAME).isSet()) {
catalogName = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
} else {
catalogName = MetaStoreUtils.getDefaultCatalog(hiveConfig);
}
final String eventType = context.getProperty(EVENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String path = context.getProperty(PATH).evaluateAttributeExpressions(flowFile).getValue();
try (final HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConfig)) {
final Table table = metaStoreClient.getTable(catalogName, databaseName, tableName);
final boolean isPartitioned = !table.getPartitionKeys().isEmpty();
switch (eventType.toLowerCase().trim()) {
case "put":
if (isPartitioned) {
EventMessage.EventType eventMessageType = handlePartitionedInsert(metaStoreClient, catalogName, databaseName, tableName, path);
flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, eventMessageType.toString());
} else {
handleFileInsert(metaStoreClient, catalogName, databaseName, tableName, path, null);
flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, EventMessage.EventType.INSERT.toString());
}
break;
case "delete":
if (isPartitioned) {
handleDropPartition(metaStoreClient, catalogName, databaseName, tableName, path);
flowFile = session.putAttribute(flowFile, METASTORE_NOTIFICATION_EVENT, EventMessage.EventType.DROP_PARTITION.toString());
} else {
getLogger().warn("The target table '{}' is not partitioned. No metastore action was executed.", tableName);
}
break;
default:
getLogger().error("Unknown event type '{}'", eventType);
session.transfer(flowFile, REL_FAILURE);
return;
}
} catch (Exception e) {
getLogger().error("Error occurred while metastore event processing", e);
session.transfer(flowFile, REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
}
/**
* Handles an insert event for partitioned table. If the partition already exists then an insert event will be executed in the metastore else a new partition will be added to the table.
*
* @param catalogName name of the catalog
* @param databaseName name of the database
* @param tableName name of the table
* @param path path for the file or directory
* @return type of the executed event
* @throws TException thrown if the metastore action fails
*/
private EventMessage.EventType handlePartitionedInsert(HiveMetaStoreClient metaStoreClient, String catalogName,
String databaseName, String tableName, String path) throws TException {
final List<String> partitionValues = getPartitionValuesFromPath(path);
try {
// Check if the partition already exists for the file to be inserted
metaStoreClient.getPartition(databaseName, tableName, partitionValues);
getLogger().debug("Creating file insert for partition with values {}", partitionValues);
handleFileInsert(metaStoreClient, catalogName, databaseName, tableName, path, partitionValues);
return EventMessage.EventType.INSERT;
} catch (Exception e) {
if (e instanceof NoSuchObjectException) {
getLogger().debug("Partition with values {} does not exists. Trying to append new partition", partitionValues, e);
try {
metaStoreClient.appendPartition(catalogName, databaseName, tableName, partitionValues);
return EventMessage.EventType.ADD_PARTITION;
} catch (TException ex) {
throw new TException("Failed to append partition with values " + partitionValues, ex);
}
}
throw new TException("Error occurred during partitioned file insertion with values " + partitionValues, e);
}
}
/**
* Parse the partition values from the file or directory path.
* e.g.: hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/file -> partition values are [5,10]
*
* @param path path for the file or directory
* @return partition values
*/
private List<String> getPartitionValuesFromPath(String path) {
final String[] pathParts = path.split("/");
List<String> partitionValues = new ArrayList<>();
for (String pathPart : pathParts) {
if (pathPart.contains("=")) {
final String[] partitionParts = pathPart.split("=");
partitionValues.add(partitionParts[1]);
}
}
getLogger().debug("The following partition values were processed from path '{}': {}", path, partitionValues);
return partitionValues;
}
/**
* Constructs a file insert event and send it into the metastore.
*
* @param catalogName name of the catalog
* @param databaseName name of the database
* @param tableName name of the table
* @param path path for the file or directory
* @param partitionValues partition values
* @throws IOException thrown if there are any error with the checksum generation
* @throws TException thrown if the metastore action fails
*/
private void handleFileInsert(HiveMetaStoreClient metaStoreClient, String catalogName, String databaseName,
String tableName, String path, List<String> partitionValues) throws IOException, TException {
final InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
insertEventRequestData.setReplace(false);
insertEventRequestData.addToFilesAdded(path);
insertEventRequestData.addToFilesAddedChecksum(checksumFor(path));
final FireEventRequestData fireEventRequestData = new FireEventRequestData();
fireEventRequestData.setInsertData(insertEventRequestData);
final FireEventRequest fireEventRequest = new FireEventRequest(true, fireEventRequestData);
fireEventRequest.setCatName(catalogName);
fireEventRequest.setDbName(databaseName);
fireEventRequest.setTableName(tableName);
if (partitionValues != null) {
fireEventRequest.setPartitionVals(partitionValues);
}
metaStoreClient.fireListenerEvent(fireEventRequest);
}
/**
* Triggers a drop partition event in the metastore with the partition values parsed from the provided path.
*
* @param catalogName name of the catalog
* @param databaseName name of the database
* @param tableName name of the table
* @param path path for the file or directory
* @throws TException thrown if the metastore action fails
*/
private void handleDropPartition(HiveMetaStoreClient metaStoreClient, String catalogName, String databaseName,
String tableName, String path) throws TException {
final List<String> partitionValues = getPartitionValuesFromPath(path);
try {
metaStoreClient.dropPartition(catalogName, databaseName, tableName, partitionValues, true);
} catch (TException e) {
if (e instanceof NoSuchObjectException) {
getLogger().error("Failed to drop partition. Partition with values {} does not exists.", partitionValues, e);
}
throw new TException(e);
}
}
/**
* Generates checksum for file on the given path if the FileSystem supports it.
*
* @param filePath path for the file
* @return checksum for the file.
* @throws IOException thrown if there are any issue getting the FileSystem
*/
private String checksumFor(String filePath) throws IOException {
final Path path = new Path(filePath);
final FileSystem fileSystem = path.getFileSystem(hiveConfig);
final FileChecksum checksum = fileSystem.getFileChecksum(path);
if (checksum != null) {
return StringUtils.byteToHexString(checksum.getBytes(), 0, checksum.getLength());
}
return "";
}
/**
* Checks if the user TGT expired and performs a re-login if needed.
*
* @return ugi
*/
private UserGroupInformation getUgi() {
KerberosUser kerberosUser = kerberosUserReference.get();
try {
kerberosUser.checkTGTAndRelogin();
} catch (KerberosLoginException e) {
throw new ProcessException("Unable to re-login with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
return ugi;
}
}

View File

@ -31,7 +31,6 @@ import org.apache.nifi.security.krb.KerberosUser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -42,7 +41,12 @@ public class HiveConfigurator {
public Collection<ValidationResult> validate(String configFiles, String principal, String keyTab, String password,
AtomicReference<ValidationResources> validationResourceHolder, ComponentLog log) {
final List<ValidationResult> problems = new ArrayList<>();
final Configuration hiveConfig = getConfigurationForValidation(validationResourceHolder, configFiles, log);
return new ArrayList<>(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log));
}
public Configuration getConfigurationForValidation(AtomicReference<ValidationResources> validationResourceHolder, String configFiles, ComponentLog log) {
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
@ -53,11 +57,7 @@ public class HiveConfigurator {
validationResourceHolder.set(resources);
}
final Configuration hiveConfig = resources.getConfiguration();
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(this.getClass().getSimpleName(), hiveConfig, principal, keyTab, password, log));
return problems;
return resources.getConfiguration();
}
public HiveConf getConfigurationFromFiles(final String configFiles) {

View File

@ -17,3 +17,4 @@ org.apache.nifi.processors.hive.PutHive3QL
org.apache.nifi.processors.hive.PutHive3Streaming
org.apache.nifi.processors.orc.PutORC
org.apache.nifi.processors.hive.UpdateHive3Table
org.apache.nifi.processors.hive.TriggerHiveMetaStoreEvent

View File

@ -0,0 +1,52 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>TriggerHiveMetaStoreEvent</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>TriggerHiveMetaStoreEvent</h1>
<h3>Event Types</h3>
<p>
The processor supports the following event types:
<dl>
<dt>Put</dt>
<dd>
The put event type represents a file or directory insertion. The metastore action depends on the target table. If the table is partitioned
the processor will parse the target partition values from the path e.g.: if the inserted file's path is 'hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/file'
then the partition values are coming from the '/a=5/b=10/' path part -> [5,10]. If the parsed partition does not exist then an 'APPEND PARTITION' event is going to be called
and the new partition will be registered in the metastore. If the partition exists only an 'INSERT' event will be called.
</dd>
<dt>Delete</dt>
<dd>
The delete event type represents a 'DROP PARTITION' metastore action. The processor will parse the target partition values from the path
e.g.: if the deleted directory's path is 'hdfs://localhost:9000/user/hive/warehouse/table/a=5/b=10/' then the partition values are coming from the '/a=5/b=10/' path part -> [5,10].
If the table is not partitioned then no metastore action will be executed.
</dd>
</dl>
Any other event type will cause an error and will be directed to the failure relation of the processor.
</p>
</body>
</html>

View File

@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS;
import static org.apache.nifi.processors.hive.TriggerHiveMetaStoreEvent.METASTORE_NOTIFICATION_EVENT;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
public class TestTriggerHiveMetaStoreEvent {
private TestRunner runner;
private TriggerHiveMetaStoreEvent processor;
private HiveMetaStoreClient metaStoreClient;
private static final String TEST_DATABASE_NAME = "test_metastore";
private static final String TEST_TABLE_NAME = "test_table";
private static final List<FieldSchema> PARTITION_COLUMNS = Arrays.asList(
new FieldSchema("year", "string", "year partition column"),
new FieldSchema("month", "string", "month partition column")
);
@RegisterExtension
public ThriftMetastore metastore = new ThriftMetastore()
.withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(), "org.apache.hive.hcatalog.listener.DbNotificationListener"));
@BeforeEach
public void setUp() {
processor = new TriggerHiveMetaStoreEvent();
metaStoreClient = metastore.getMetaStoreClient();
}
private void initUnPartitionedTable() throws Exception {
createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, Collections.emptyList(), metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME);
}
private void initPartitionedTable() throws Exception {
Table table = createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, PARTITION_COLUMNS, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME);
createPartition(table, Lists.newArrayList("2017", "march"));
createPartition(table, Lists.newArrayList("2017", "april"));
createPartition(table, Lists.newArrayList("2018", "march"));
}
@DisabledOnOs(WINDOWS)
@Test
public void testInsertOnUnPartitionedTable() throws Exception {
initUnPartitionedTable();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation());
runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put");
runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file");
runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME);
runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0);
Assertions.assertEquals(EventMessage.EventType.INSERT.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT));
CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId();
NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null);
NotificationEvent event = response.getEvents().get(0);
InsertMessage insertMessage = MessageFactory.getInstance().getDeserializer().getInsertMessage(event.getMessage());
insertMessage.getFiles().forEach(s -> assertEquals(s, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file###"));
assertEquals(event.getEventType(), EventMessage.EventType.INSERT.toString());
assertEquals(event.getDbName(), TEST_DATABASE_NAME);
assertEquals(insertMessage.getDB(), TEST_DATABASE_NAME);
assertEquals(event.getTableName(), TEST_TABLE_NAME);
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
}
@DisabledOnOs(WINDOWS)
@Test
public void testInsertOnPartitionedTable() throws Exception {
initPartitionedTable();
assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys());
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation());
runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put");
runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file");
runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME);
runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0);
Assertions.assertEquals(EventMessage.EventType.INSERT.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT));
CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId();
NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null);
NotificationEvent event = response.getEvents().get(0);
InsertMessage insertMessage = MessageFactory.getInstance().getDeserializer().getInsertMessage(event.getMessage());
insertMessage.getFiles().forEach(s -> assertEquals(s, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file###"));
assertEquals(event.getEventType(), EventMessage.EventType.INSERT.toString());
assertEquals(event.getDbName(), TEST_DATABASE_NAME);
assertEquals(insertMessage.getDB(), TEST_DATABASE_NAME);
assertEquals(event.getTableName(), TEST_TABLE_NAME);
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
}
@DisabledOnOs(WINDOWS)
@Test
public void testAddPartition() throws Exception {
initPartitionedTable();
assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys());
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation());
runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "put");
runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=june/test_file");
runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME);
runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0);
Assertions.assertEquals(EventMessage.EventType.ADD_PARTITION.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT));
CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId();
NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null);
NotificationEvent event = response.getEvents().get(0);
AddPartitionMessage addPartitionMessage = MessageFactory.getInstance().getDeserializer().getAddPartitionMessage(event.getMessage());
Map<String, String> expectedPartition = new HashMap<String, String>() {{
put("year", "2017");
put("month", "june");
}};
assertEquals(addPartitionMessage.getPartitions().get(0), expectedPartition);
assertEquals(event.getEventType(), EventMessage.EventType.ADD_PARTITION.toString());
assertEquals(event.getDbName(), TEST_DATABASE_NAME);
assertEquals(addPartitionMessage.getDB(), TEST_DATABASE_NAME);
assertEquals(event.getTableName(), TEST_TABLE_NAME);
assertEquals(addPartitionMessage.getTable(), TEST_TABLE_NAME);
assertDoesNotThrow(() -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june")));
}
@DisabledOnOs(WINDOWS)
@Test
public void testDropPartition() throws Exception {
initPartitionedTable();
assertEquals(PARTITION_COLUMNS, metaStoreClient.getTable(TEST_DATABASE_NAME, TEST_TABLE_NAME).getPartitionKeys());
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation());
runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "delete");
runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/year=2017/month=march/test_file");
runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME);
runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(TriggerHiveMetaStoreEvent.REL_SUCCESS).get(0);
Assertions.assertEquals(EventMessage.EventType.DROP_PARTITION.toString(), flowFile.getAttribute(METASTORE_NOTIFICATION_EVENT));
CurrentNotificationEventId eventId = metaStoreClient.getCurrentNotificationEventId();
NotificationEventResponse response = metaStoreClient.getNextNotification(eventId.getEventId() - 1, 0, null);
NotificationEvent event = response.getEvents().get(0);
DropPartitionMessage dropPartitionMessage = MessageFactory.getInstance().getDeserializer().getDropPartitionMessage(event.getMessage());
Map<String, String> expectedPartition = new HashMap<String, String>() {{
put("year", "2017");
put("month", "march");
}};
assertEquals(dropPartitionMessage.getPartitions().get(0), expectedPartition);
assertEquals(event.getEventType(), EventMessage.EventType.DROP_PARTITION.toString());
assertEquals(event.getDbName(), TEST_DATABASE_NAME);
assertEquals(dropPartitionMessage.getDB(), TEST_DATABASE_NAME);
assertEquals(event.getTableName(), TEST_TABLE_NAME);
assertEquals(dropPartitionMessage.getTable(), TEST_TABLE_NAME);
assertThrows(NoSuchObjectException.class, () -> metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, Arrays.asList("2017", "june")));
}
@DisabledOnOs(WINDOWS)
@Test
public void testUnknownEventType() throws Exception {
initUnPartitionedTable();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TriggerHiveMetaStoreEvent.HIVE_CONFIGURATION_RESOURCES, metastore.getConfigurationLocation());
runner.setProperty(TriggerHiveMetaStoreEvent.EVENT_TYPE, "unknown");
runner.setProperty(TriggerHiveMetaStoreEvent.PATH, metastore.getWarehouseLocation() + "/" + TEST_TABLE_NAME + "/test_file");
runner.setProperty(TriggerHiveMetaStoreEvent.DATABASE_NAME, TEST_DATABASE_NAME);
runner.setProperty(TriggerHiveMetaStoreEvent.TABLE_NAME, TEST_TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(TriggerHiveMetaStoreEvent.REL_FAILURE, 1);
}
private Table createTable(String databaseName, String tableName, List<FieldSchema> partitionColumns, String location) throws Exception {
return new TableBuilder()
.setDbName(databaseName)
.setTableName(tableName)
.addCol("id", "int", "id column")
.addCol("value", "string", "value column")
.setPartCols(partitionColumns)
.setType(TableType.EXTERNAL_TABLE.name())
.setLocation(location)
.create(metaStoreClient, metastore.getConfiguration());
}
private void createPartition(Table table, List<String> values) throws Exception {
new PartitionBuilder()
.inTable(table)
.setValues(values)
.addToTable(metaStoreClient, metastore.getConfiguration());
}
}

View File

@ -35,6 +35,7 @@
<module>nifi-hive_1_1-nar</module>
<module>nifi-hive3-processors</module>
<module>nifi-hive3-nar</module>
<module>nifi-hive-test-utils</module>
</modules>
<dependencyManagement>

View File

@ -171,6 +171,12 @@
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hive-test-utils</artifactId>
<version>1.20.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>

View File

@ -28,8 +28,8 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
import org.apache.nifi.processors.iceberg.metastore.ThriftMetastore;
import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;