NIFI-11427 Upgraded Atlas from 2.2.0 to 2.3.0

This closes #7158

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-04-10 21:55:41 +03:00 committed by exceptionfactory
parent 3b216ced92
commit cbca499070
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 8 additions and 180 deletions

View File

@ -31,6 +31,7 @@ import java.util.List;
public class NiFiAtlasHook extends AtlasHook implements LineageContext { public class NiFiAtlasHook extends AtlasHook implements LineageContext {
public static final String NIFI_USER = "nifi"; public static final String NIFI_USER = "nifi";
public static final String NIFI_SOURCE = "nifi";
private NiFiAtlasClient atlasClient; private NiFiAtlasClient atlasClient;
@ -45,6 +46,11 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
messages.add(message); messages.add(message);
} }
@Override
public String getMessageSource() {
return NIFI_SOURCE;
}
public void commitMessages() { public void commitMessages() {
final NotificationSender notificationSender = createNotificationSender(); final NotificationSender notificationSender = createNotificationSender();
notificationSender.setAtlasClient(atlasClient); notificationSender.setAtlasClient(atlasClient);

View File

@ -80,7 +80,6 @@ public class AtlasAPIV2ServerEmulator {
private Server server; private Server server;
private ServerConnector httpConnector; private ServerConnector httpConnector;
private AtlasNotificationServerEmulator notificationServerEmulator; private AtlasNotificationServerEmulator notificationServerEmulator;
private EmbeddedKafka embeddedKafka;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final AtlasAPIV2ServerEmulator emulator = new AtlasAPIV2ServerEmulator(); final AtlasAPIV2ServerEmulator emulator = new AtlasAPIV2ServerEmulator();
@ -95,9 +94,6 @@ public class AtlasAPIV2ServerEmulator {
server.start(); server.start();
logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort()); logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort());
embeddedKafka = new EmbeddedKafka();
embeddedKafka.start();
notificationServerEmulator.consume(m -> { notificationServerEmulator.consume(m -> {
if (m instanceof HookNotificationV1.EntityCreateRequest) { if (m instanceof HookNotificationV1.EntityCreateRequest) {
HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m; HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m;
@ -225,7 +221,6 @@ public class AtlasAPIV2ServerEmulator {
public void stop() throws Exception { public void stop() throws Exception {
notificationServerEmulator.stop(); notificationServerEmulator.stop();
embeddedKafka.stop();
server.stop(); server.stop();
} }

View File

@ -1,173 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.atlas.emulator;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Properties;
/**
* Embedded Kafka server, primarily to be used for testing.
*/
public class EmbeddedKafka {
private final KafkaServerStartable kafkaServer;
private final Properties zookeeperConfig;
private final Properties kafkaConfig;
private final ZooKeeperServer zkServer;
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
private boolean started;
/**
* Will create instance of the embedded Kafka server. Kafka and Zookeeper
* configuration properties will be loaded from 'server.properties' and
* 'zookeeper.properties' located at the root of the classpath.
*/
public EmbeddedKafka() {
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
}
/**
* Will create instance of the embedded Kafka server.
*
* @param kafkaConfig
* Kafka configuration properties
* @param zookeeperConfig
* Zookeeper configuration properties
*/
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
this.cleanupKafkaWorkDir();
this.kafkaConfig = kafkaConfig;
this.zookeeperConfig = zookeeperConfig;
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
this.zkServer = new ZooKeeperServer();
}
/**
* Will start embedded Kafka server. Its data directories will be created
* at 'kafka-tmp' directory relative to the working directory of the current
* runtime. The data directories will be deleted upon JVM exit.
*
*/
public void start() {
if (!this.started) {
logger.info("Starting Zookeeper server");
this.startZookeeper();
logger.info("Starting Kafka server");
this.kafkaServer.startup();
logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.staticServerConfig().port()
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
this.started = true;
}
}
/**
* Will stop embedded Kafka server, cleaning up all working directories.
*/
public void stop() {
if (this.started) {
logger.info("Shutting down Kafka server");
this.kafkaServer.shutdown();
this.kafkaServer.awaitShutdown();
logger.info("Shutting down Zookeeper server");
this.shutdownZookeeper();
logger.info("Embedded Kafka is shut down.");
this.cleanupKafkaWorkDir();
this.started = false;
}
}
/**
*
*/
private void cleanupKafkaWorkDir() {
File kafkaTmp = new File("target/kafka-tmp");
try {
FileUtils.deleteDirectory(kafkaTmp);
} catch (Exception e) {
logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
}
}
/**
* Will start Zookeeper server via {@link ServerCnxnFactory}
*/
private void startZookeeper() {
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(this.zookeeperConfig);
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
FileTxnSnapLog txnLog = new FileTxnSnapLog(configuration.getDataLogDir(), configuration.getDataDir());
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(configuration.getTickTime());
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
configuration.getMaxClientCnxns());
zookeeperConnectionFactory.startup(zkServer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new IllegalStateException("Failed to start Zookeeper server", e);
}
}
/**
* Will shut down Zookeeper server.
*/
private void shutdownZookeeper() {
zkServer.shutdown();
}
/**
* Will load {@link Properties} from properties file discovered at the
* provided path relative to the root of the classpath.
*/
private static Properties loadPropertiesFromClasspath(String path) {
try {
Properties kafkaProperties = new Properties();
kafkaProperties.load(Class.class.getResourceAsStream(path));
return kafkaProperties;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -82,7 +82,7 @@ public class TestAwsS3DirectoryV2 extends AbstractTestAwsS3Directory {
assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName()); assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName());
assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
assertEquals(directory, ref.get(ATTR_NAME)); assertEquals(directory, ref.get(ATTR_NAME));
assertEquals(StringUtils.substringBeforeLast(actualPath, "/") + "/", ref.get(ATTR_OBJECT_PREFIX_V2)); assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2));
assertNotNull(ref.get(ATTR_CONTAINER_V2)); assertNotNull(ref.get(ATTR_CONTAINER_V2));
ref = (Referenceable) ref.get(ATTR_CONTAINER_V2); ref = (Referenceable) ref.get(ATTR_CONTAINER_V2);

View File

@ -26,7 +26,7 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<properties> <properties>
<atlas.version>2.2.0</atlas.version> <atlas.version>2.3.0</atlas.version>
</properties> </properties>
<modules> <modules>