NIFI-12206 Refactor Flow History using JetBrains Xodus (#7870)

* NIFI-12206 Refactored Flow History using JetBrains Xodus

- Replaced H2 Database Engine with JetBrains Xodus for persistent storage of FlowConfigurationHistory
- Added EntityStoreAuditService implementation using Xodus PersistentEntityStore
- Removed nifi.h2.url.append from properties
This commit is contained in:
exceptionfactory 2023-10-12 13:50:25 -05:00 committed by GitHub
parent abfc49e212
commit 22ad7d542d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 1358 additions and 2398 deletions

View File

@ -99,7 +99,6 @@ public class MiNiFiPropertiesGenerator {
Triple.of(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "./conf/state-management.xml", "# State Management"),
Triple.of(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider", "# The ID of the local state provider"),
Triple.of(NiFiProperties.REPOSITORY_DATABASE_DIRECTORY, "./database_repository", "# H2 Settings"),
Triple.of(NiFiProperties.H2_URL_APPEND, ";LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE", EMPTY),
Triple.of(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository", "# FlowFile Repository"),
Triple.of(NiFiProperties.FLOWFILE_REPOSITORY_DIRECTORY, "./flowfile_repository", EMPTY),
Triple.of(NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, "20 secs", EMPTY),

View File

@ -41,9 +41,8 @@ nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -41,9 +41,8 @@ nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -41,9 +41,8 @@ nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -41,9 +41,8 @@ nifi.state.management.configuration.file=./conf/state-management.xml
# The ID of the local state provider
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -30,9 +30,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -74,7 +74,6 @@ public class NiFiProperties extends ApplicationProperties {
public static final String COMPONENT_DOCS_DIRECTORY = "nifi.documentation.working.directory";
public static final String SENSITIVE_PROPS_KEY = "nifi.sensitive.props.key";
public static final String SENSITIVE_PROPS_ALGORITHM = "nifi.sensitive.props.algorithm";
public static final String H2_URL_APPEND = "nifi.h2.url.append";
public static final String REMOTE_INPUT_HOST = "nifi.remote.input.host";
public static final String REMOTE_INPUT_PORT = "nifi.remote.input.socket.port";
public static final String SITE_TO_SITE_SECURE = "nifi.remote.input.secure";

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=
nifi.custom.nar.library.directory.alt=
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,7 +29,6 @@ nifi.nar.working.directory=./target/work/nar/
# H2 Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -3370,14 +3370,13 @@ for components to persist state. See the <<state_management>> section for more i
|====
=== H2 Settings
=== Database Settings
The H2 Settings section defines the settings for the H2 database, which keeps track of user access and flow controller history.
The Database Settings section defines the settings for the internal database, which tracks flow configuration history.
|====
|*Property*|*Description*
|`nifi.database.directory`*|The location of the H2 database directory. The default value is `./database_repository`.
|`nifi.h2.url.append`|This property specifies additional arguments to add to the connection string for the H2 database. The default value should be used and should not be changed. It is: `;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE`.
|====
[[repository-encryption-properties]]

View File

@ -21,6 +21,9 @@
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-administration</artifactId>
<properties>
<xodus.version>2.0.1</xodus.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -35,48 +38,14 @@
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core-api</artifactId>
<groupId>org.jetbrains.xodus</groupId>
<artifactId>xodus-openAPI</artifactId>
<version>${xodus.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<groupId>org.jetbrains.xodus</groupId>
<artifactId>xodus-entity-store</artifactId>
<version>${xodus.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,228 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.util.NiFiProperties;
import org.h2.jdbcx.JdbcConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import java.io.File;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
/**
*
*/
public class AuditDataSourceFactoryBean implements FactoryBean {
private static final Logger logger = LoggerFactory.getLogger(AuditDataSourceFactoryBean.class);
private static final String NF_USERNAME_PASSWORD = "nf";
private static final int MAX_CONNECTIONS = 5;
// database file name
private static final String AUDIT_DATABASE_FILE_NAME = "nifi-flow-audit";
// ------------
// action table
// ------------
private static final String CREATE_ACTION_TABLE = "CREATE TABLE ACTION ("
+ "ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
+ "IDENTITY VARCHAR2(4096) NOT NULL, "
+ "SOURCE_ID VARCHAR2(100) NOT NULL, "
+ "SOURCE_NAME VARCHAR2(1000) NOT NULL, "
+ "SOURCE_TYPE VARCHAR2(1000) NOT NULL, "
+ "OPERATION VARCHAR2(50) NOT NULL, "
+ "ACTION_TIMESTAMP TIMESTAMP NOT NULL "
+ ")";
// -----------------
// component details
// -----------------
private static final String CREATE_PROCESSOR_DETAILS_TABLE = "CREATE TABLE PROCESSOR_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "TYPE VARCHAR2(1000) NOT NULL, "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
private static final String CREATE_REMOTE_PROCESS_GROUP_DETAILS_TABLE = "CREATE TABLE REMOTE_PROCESS_GROUP_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "URI VARCHAR2(2500) NOT NULL, "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
// --------------
// action details
// --------------
private static final String CREATE_MOVE_DETAILS_TABLE = "CREATE TABLE MOVE_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "GROUP_ID VARCHAR2(100) NOT NULL, "
+ "GROUP_NAME VARCHAR2(1000) NOT NULL, "
+ "PREVIOUS_GROUP_ID VARCHAR2(100) NOT NULL, "
+ "PREVIOUS_GROUP_NAME VARCHAR2(1000) NOT NULL, "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
private static final String CREATE_CONFIGURE_DETAILS_TABLE = "CREATE TABLE CONFIGURE_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "NAME VARCHAR2(1000) NOT NULL, "
+ "\"VALUE\" VARCHAR2(5000), "
+ "PREVIOUS_VALUE VARCHAR2(5000), "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
private static final String CREATE_CONNECT_DETAILS_TABLE = "CREATE TABLE CONNECT_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "SOURCE_ID VARCHAR2(100) NOT NULL, "
+ "SOURCE_NAME VARCHAR2(1000), "
+ "SOURCE_TYPE VARCHAR2(1000) NOT NULL, "
+ "RELATIONSHIP VARCHAR2(1000), "
+ "DESTINATION_ID VARCHAR2(100) NOT NULL, "
+ "DESTINATION_NAME VARCHAR2(1000), "
+ "DESTINATION_TYPE VARCHAR2(1000) NOT NULL, "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
private static final String CREATE_PURGE_DETAILS_TABLE = "CREATE TABLE PURGE_DETAILS ("
+ "ACTION_ID INT NOT NULL PRIMARY KEY, "
+ "END_DATE TIMESTAMP NOT NULL, "
+ "FOREIGN KEY (ACTION_ID) REFERENCES ACTION(ID)"
+ ")";
private JdbcConnectionPool connectionPool;
private NiFiProperties properties;
@Override
public Object getObject() throws Exception {
if (connectionPool == null) {
// locate the repository directory
String repositoryDirectoryPath = properties.getProperty(NiFiProperties.REPOSITORY_DATABASE_DIRECTORY);
// ensure the repository directory is specified
if (repositoryDirectoryPath == null) {
throw new NullPointerException("Database directory must be specified.");
}
// create a handle to the repository directory
File repositoryDirectory = new File(repositoryDirectoryPath);
// get a handle to the database file
File dbFileNoExtension = new File(repositoryDirectory, AUDIT_DATABASE_FILE_NAME);
// format the database url
String databaseUrl = "jdbc:h2:" + dbFileNoExtension + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3";
String databaseUrlAppend = properties.getProperty(NiFiProperties.H2_URL_APPEND);
if (StringUtils.isNotBlank(databaseUrlAppend)) {
databaseUrl += databaseUrlAppend;
}
// create the pool
connectionPool = JdbcConnectionPool.create(databaseUrl, NF_USERNAME_PASSWORD, NF_USERNAME_PASSWORD);
connectionPool.setMaxConnections(MAX_CONNECTIONS);
Connection connection = null;
ResultSet rs = null;
Statement statement = null;
try {
// get a connection
connection = connectionPool.getConnection();
final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit(false) not supported by this driver");
}
}
// create a statement for initializing the database
statement = connection.createStatement();
// determine if the tables need to be created
rs = connection.getMetaData().getTables(null, null, "ACTION", null);
if (!rs.next()) {
logger.info("Database not built for repository: " + databaseUrl + ". Building now...");
RepositoryUtils.closeQuietly(rs);
// action table
statement.execute(CREATE_ACTION_TABLE);
// component details
statement.execute(CREATE_PROCESSOR_DETAILS_TABLE);
statement.execute(CREATE_REMOTE_PROCESS_GROUP_DETAILS_TABLE);
// action details
statement.execute(CREATE_MOVE_DETAILS_TABLE);
statement.execute(CREATE_CONFIGURE_DETAILS_TABLE);
statement.execute(CREATE_CONNECT_DETAILS_TABLE);
statement.execute(CREATE_PURGE_DETAILS_TABLE);
}
// commit any changes
connection.commit();
} catch (SQLException sqle) {
RepositoryUtils.rollback(connection, logger);
throw sqle;
} finally {
RepositoryUtils.closeQuietly(rs);
RepositoryUtils.closeQuietly(statement);
RepositoryUtils.closeQuietly(connection);
}
}
return connectionPool;
}
@Override
public Class getObjectType() {
return JdbcConnectionPool.class;
}
@Override
public boolean isSingleton() {
return true;
}
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}
/**
* Disposes resources.
*/
public void shutdown() {
// shutdown the connection pool
if (connectionPool != null) {
try {
connectionPool.dispose();
} catch (Exception e) {
logger.warn("Unable to dispose of connection pool: " + e.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, e);
}
}
}
}
}

View File

@ -1,90 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
/**
* A utility class for useful methods dealing with the repository
*
*/
public class RepositoryUtils {
public static void rollback(final Connection conn, final Logger logger) {
try {
if (null != conn) {
conn.rollback();
}
} catch (final SQLException sqe) {
logger.warn("The following problem occurred while trying to rollback " + conn + ": " + sqe.getLocalizedMessage());
if (logger.isDebugEnabled()) {
logger.debug("", sqe);
}
}
}
/**
* Closes the given statement quietly - no logging, no exceptions
*
* @param statement to close
*/
public static void closeQuietly(final Statement statement) {
if (null != statement) {
try {
statement.close();
} catch (final SQLException se) { /*IGNORE*/
}
}
}
/**
* Closes the given result set quietly - no logging, no exceptions
*
* @param resultSet to close
*/
public static void closeQuietly(final ResultSet resultSet) {
if (null != resultSet) {
try {
resultSet.close();
} catch (final SQLException se) {/*IGNORE*/
}
}
}
/**
* Closes the given connection quietly - no logging, no exceptions
*
* @param conn to close
*/
public static void closeQuietly(final Connection conn) {
if (null != conn) {
try {
conn.close();
} catch (final SQLException se) {/*IGNORE*/
}
}
}
}

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.dao;
import org.apache.nifi.action.Action;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Action data access.
*/
public interface ActionDAO {
/**
* Persists the specified action.
*
* @param action to persist
* @return the created Action with it's id
* @throws DataAccessException if unable to persist
*/
Action createAction(Action action) throws DataAccessException;
/**
* Finds all actions that meet the specified criteria.
*
* @param actionQuery query for actions
* @return History of actions
* @throws DataAccessException dae
*/
History findActions(HistoryQuery actionQuery) throws DataAccessException;
/**
* @param componentId to get previous values of
* @return Finds the previous values for the specified property in the
* specified component. Returns empty list if there are none
*/
Map<String, List<PreviousValue>> getPreviousValues(String componentId);
/**
* Deletes the history of a component's property.
*
* @param propertyName the name of the property
* @param componentId to delete previous values of
*/
void deletePreviousValues(String propertyName, String componentId);
/**
* Finds the specified action.
*
* @param actionId action identifier
* @return Action specified
* @throws DataAccessException dae
*/
Action getAction(Integer actionId) throws DataAccessException;
/**
* Deletes all actions up to the specified end date.
*
* @param endDate date to stop deleting at
* @throws DataAccessException dae
*/
void deleteActions(Date endDate) throws DataAccessException;
}

View File

@ -1,25 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.dao;
/**
*
*/
public interface DAOFactory {
ActionDAO getActionDAO();
}

View File

@ -0,0 +1,636 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service;
import jetbrains.exodus.entitystore.Entity;
import jetbrains.exodus.entitystore.EntityId;
import jetbrains.exodus.entitystore.EntityIterable;
import jetbrains.exodus.entitystore.PersistentEntityStore;
import jetbrains.exodus.entitystore.PersistentEntityStores;
import jetbrains.exodus.entitystore.StoreTransaction;
import jetbrains.exodus.env.Environment;
import jetbrains.exodus.env.EnvironmentConfig;
import jetbrains.exodus.env.Environments;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.component.details.ExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeExtensionDetails;
import org.apache.nifi.action.component.details.FlowChangeRemoteProcessGroupDetails;
import org.apache.nifi.action.component.details.RemoteProcessGroupDetails;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.ConfigureDetails;
import org.apache.nifi.action.details.ConnectDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangeConnectDetails;
import org.apache.nifi.action.details.FlowChangeMoveDetails;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.action.details.MoveDetails;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.admin.service.entity.ActionEntity;
import org.apache.nifi.admin.service.entity.ActionLink;
import org.apache.nifi.admin.service.entity.ConfigureDetailsEntity;
import org.apache.nifi.admin.service.entity.ConnectDetailsEntity;
import org.apache.nifi.admin.service.entity.EntityProperty;
import org.apache.nifi.admin.service.entity.EntityType;
import org.apache.nifi.admin.service.entity.MoveDetailsEntity;
import org.apache.nifi.admin.service.entity.PurgeDetailsEntity;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
/**
* Audit Service implementation based on JetBrains Xodus Entity Store
*/
public class EntityStoreAuditService implements AuditService, Closeable {
private static final long FIRST_START_TIME = 0;
private static final int PREVIOUS_VALUES_LIMIT = 5;
private static final String ASCENDING_SORT_ORDER = "ASC";
private static final int DEFAULT_COUNT = 100;
private static final String BACKUP_FILE_NAME_FORMAT = "%s.backup.%d";
private static final Logger logger = LoggerFactory.getLogger(EntityStoreAuditService.class);
private final PersistentEntityStore entityStore;
private final Environment environment;
/**
* Entity Store Audit Service constructor with required properties for persistent location
*
* @param directory Persistent Entity Store directory
*/
public EntityStoreAuditService(final File directory) {
environment = loadEnvironment(directory);
entityStore = PersistentEntityStores.newInstance(environment);
logger.info("Environment configured with directory [{}]", directory);
}
/**
* Add Actions to Persistent Store
*
* @param actions Collections of Actions to be added
*/
@Override
public void addActions(final Collection<Action> actions) {
Objects.requireNonNull(actions, "Actions required");
entityStore.executeInExclusiveTransaction(storeTransaction -> {
for (final Action action : actions) {
addAction(storeTransaction, action);
}
logger.debug("Actions added [{}]", actions.size());
});
}
/**
* Get Previous Values for specified Component Identifier
*
* @param componentId Component Identifier for which previous property values should be retrieved
* @return Map of Property Name to List of Previous Property Values
*/
@Override
public Map<String, List<PreviousValue>> getPreviousValues(final String componentId) {
Objects.requireNonNull(componentId, "Component Identifier required");
return entityStore.computeInReadonlyTransaction(storeTransaction -> {
final Map<String, List<PreviousValue>> previousValuesFound = new LinkedHashMap<>();
final EntityIterable actionEntities = storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.SOURCE_ID.getProperty(), componentId);
for (Entity actionEntity : actionEntities) {
final Entity configureDetails = actionEntity.getLink(ActionLink.CONFIGURE_DETAILS.getProperty());
if (configureDetails != null) {
final String name = getProperty(configureDetails, ConfigureDetailsEntity.NAME);
final String value = getProperty(configureDetails, ConfigureDetailsEntity.VALUE);
final PreviousValue previousValue = new PreviousValue();
previousValue.setPreviousValue(value);
previousValue.setUserIdentity(getProperty(actionEntity, ActionEntity.USER_IDENTITY));
previousValue.setTimestamp(getDateProperty(actionEntity, ActionEntity.TIMESTAMP));
final List<PreviousValue> previousValues = previousValuesFound.get(name);
if (previousValues == null) {
final List<PreviousValue> newPreviousValues = new ArrayList<>();
newPreviousValues.add(previousValue);
previousValuesFound.put(name, newPreviousValues);
} else if (previousValues.size() < PREVIOUS_VALUES_LIMIT) {
previousValues.add(previousValue);
}
}
}
return previousValuesFound;
});
}
/**
* Delete Previous Values for specified Component Identifier and Property
*
* @param propertyName Name of the property for which Previous Values should be deleted
* @param componentId Component Identifier for which Previous Values should be deleted
*/
@Override
public void deletePreviousValues(final String propertyName, final String componentId) {
Objects.requireNonNull(propertyName, "Property Name required");
Objects.requireNonNull(componentId, "Component Identifier required");
entityStore.executeInExclusiveTransaction(storeTransaction -> {
final EntityIterable actionEntities = storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.SOURCE_ID.getProperty(), componentId);
for (Entity actionEntity : actionEntities) {
final Entity configureDetails = actionEntity.getLink(ActionLink.CONFIGURE_DETAILS.getProperty());
if (configureDetails != null) {
final Comparable<?> configureDetailsName = configureDetails.getProperty(ConfigureDetailsEntity.NAME.getProperty());
if (propertyName.equals(configureDetailsName)) {
actionEntity.deleteLinks(ActionLink.CONFIGURE_DETAILS.getProperty());
}
}
}
});
logger.info("Component [{}] Previous Property Values deleted", componentId);
}
/**
* Get Actions based on query parameters
*
* @param actionQuery History Action Query
* @return Actions found
*/
@Override
public History getActions(final HistoryQuery actionQuery) {
Objects.requireNonNull(actionQuery, "History Query required");
return entityStore.computeInReadonlyTransaction(storeTransaction -> {
final Collection<Action> actionsFound = new ArrayList<>();
final EntityIterable entities = findActionEntities(actionQuery, storeTransaction);
final int total = Math.toIntExact(entities.size());
final Integer queryOffset = actionQuery.getOffset();
final int skip = Objects.requireNonNullElse(queryOffset, 0);
final Integer queryCount = actionQuery.getCount();
final int limit = Objects.requireNonNullElse(queryCount, DEFAULT_COUNT);
final EntityIterable selected = entities.skip(skip).take(limit);
for (final Entity entity : selected) {
final Action action = readAction(entity);
actionsFound.add(action);
}
final History history = new History();
history.setActions(actionsFound);
history.setTotal(total);
history.setLastRefreshed(new Date());
return history;
});
}
/**
* Get Actions starting with indicated Action Identifier
*
* @param firstActionId First Action Identifier
* @param maxActions Maximum number of Actions to be returned
* @return Actions found
*/
@Override
public History getActions(final int firstActionId, final int maxActions) {
return entityStore.computeInReadonlyTransaction(storeTransaction -> {
final Collection<Action> actions = new ArrayList<>();
final int lastActionId = firstActionId + maxActions;
final EntityIterable found = storeTransaction.findIds(EntityType.ACTION.getEntityType(), firstActionId, lastActionId);
for (final Entity entity : found) {
final Action action = readAction(entity);
actions.add(action);
}
final History history = new History();
history.setActions(actions);
history.setLastRefreshed(new Date());
history.setTotal(actions.size());
return history;
});
}
/**
* Get Action for specified identifier
*
* @param actionId Action Identifier
* @return Action or null when not found
*/
@Override
public Action getAction(final Integer actionId) {
Objects.requireNonNull(actionId, "Action Identifier required");
return entityStore.computeInReadonlyTransaction(storeTransaction -> {
final Action action;
final EntityIterable found = storeTransaction.findIds(EntityType.ACTION.getEntityType(), actionId, actionId);
final Entity entity = found.getFirst();
if (entity == null) {
action = null;
} else {
action = readAction(entity);
}
return action;
});
}
/**
* Purge Actions from Persistent Entity Store clearing existing records and creating Purge Action
*
* @param end End time for purge action query
* @param purgeAction Purge Action to be recorded
*/
@Override
public void purgeActions(final Date end, final Action purgeAction) {
Objects.requireNonNull(end, "End date required");
Objects.requireNonNull(purgeAction, "Purge Action required");
final long endTime = end.getTime();
entityStore.executeInExclusiveTransaction(storeTransaction -> {
final EntityIterable entities = storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.TIMESTAMP.getProperty(), FIRST_START_TIME, endTime);
for (final Entity entity : entities) {
entity.delete();
for (final ActionLink actionLink : ActionLink.values()) {
entity.deleteLinks(actionLink.getProperty());
}
}
addAction(storeTransaction, purgeAction);
});
logger.info("User [{}] Purged Actions with end date [{}]", purgeAction.getUserIdentity(), end);
}
/**
* Close Persistent Entity Store resources
*
* @throws IOException Thrown on failures to close Entity Store
*/
@Override
public void close() throws IOException {
entityStore.close();
environment.close();
logger.info("Environment closed");
}
private EntityIterable findActionEntities(final HistoryQuery actionQuery, final StoreTransaction storeTransaction) {
final long startTimestamp;
final long endTimestamp;
final Date startDate = actionQuery.getStartDate();
if (startDate == null) {
startTimestamp = FIRST_START_TIME;
} else {
startTimestamp = startDate.getTime();
}
final Date endDate = actionQuery.getEndDate();
if (endDate == null) {
endTimestamp = System.currentTimeMillis();
} else {
endTimestamp = endDate.getTime();
}
final ActionEntity sortEntityProperty = getSortEntityProperty(actionQuery);
final boolean ascending = isAscending(actionQuery);
final EntityIterable sorted = storeTransaction.sort(EntityType.ACTION.getEntityType(), sortEntityProperty.getProperty(), ascending);
final EntityIterable entities = sorted.intersect(storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.TIMESTAMP.getProperty(), startTimestamp, endTimestamp));
final EntityIterable sourceEntities;
final String sourceId = actionQuery.getSourceId();
if (sourceId == null) {
sourceEntities = entities;
} else {
final EntityIterable sourceFiltered = storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.SOURCE_ID.getProperty(), sourceId);
sourceEntities = entities.intersect(sourceFiltered);
}
final EntityIterable filteredEntities;
final String userIdentity = actionQuery.getUserIdentity();
if (userIdentity == null) {
filteredEntities = sourceEntities;
} else {
final EntityIterable identityFiltered = storeTransaction.find(EntityType.ACTION.getEntityType(), ActionEntity.USER_IDENTITY.getProperty(), userIdentity);
filteredEntities = sourceEntities.intersect(identityFiltered);
}
return filteredEntities;
}
private boolean isAscending(final HistoryQuery historyQuery) {
final String sortOrder = historyQuery.getSortOrder();
final boolean ascending;
if (sortOrder == null || sortOrder.isEmpty()) {
ascending = false;
} else {
ascending = ASCENDING_SORT_ORDER.equalsIgnoreCase(sortOrder);
}
return ascending;
}
private ActionEntity getSortEntityProperty(final HistoryQuery historyQuery) {
final String sortColumn = historyQuery.getSortColumn();
final ActionEntity sortEntityProperty;
if (sortColumn == null || sortColumn.isEmpty()) {
sortEntityProperty = ActionEntity.TIMESTAMP;
} else {
ActionEntity foundActionEntity = null;
for (final ActionEntity actionEntity : ActionEntity.values()) {
if (actionEntity.getProperty().equals(sortColumn)) {
foundActionEntity = actionEntity;
break;
}
}
if (foundActionEntity == null) {
throw new IllegalArgumentException("Specified Sort Column not supported");
} else {
sortEntityProperty = foundActionEntity;
}
}
return sortEntityProperty;
}
private void addAction(final StoreTransaction storeTransaction, final Action action) {
final Entity actionEntity = storeTransaction.newEntity(EntityType.ACTION.getEntityType());
actionEntity.setProperty(ActionEntity.TIMESTAMP.getProperty(), action.getTimestamp().getTime());
actionEntity.setProperty(ActionEntity.USER_IDENTITY.getProperty(), action.getUserIdentity());
actionEntity.setProperty(ActionEntity.SOURCE_ID.getProperty(), action.getSourceId());
actionEntity.setProperty(ActionEntity.SOURCE_NAME.getProperty(), action.getSourceName());
actionEntity.setProperty(ActionEntity.SOURCE_TYPE.getProperty(), action.getSourceType().name());
actionEntity.setProperty(ActionEntity.OPERATION.getProperty(), action.getOperation().name());
final ComponentDetails componentDetails = action.getComponentDetails();
addComponentDetails(actionEntity, componentDetails);
final ActionDetails actionDetails = action.getActionDetails();
addActionDetails(storeTransaction, actionEntity, actionDetails);
}
private void addComponentDetails(final Entity actionEntity, final ComponentDetails componentDetails) {
if (componentDetails instanceof ExtensionDetails extensionDetails) {
actionEntity.setProperty(ActionEntity.EXTENSION_TYPE.getProperty(), extensionDetails.getType());
} else if (componentDetails instanceof RemoteProcessGroupDetails remoteProcessGroupDetails) {
actionEntity.setProperty(ActionEntity.REMOTE_PROCESS_GROUP_URI.getProperty(), remoteProcessGroupDetails.getUri());
}
}
private void addActionDetails(final StoreTransaction storeTransaction, final Entity actionEntity, final ActionDetails actionDetails) {
if (actionDetails instanceof ConnectDetails connectDetails) {
addConnectDetails(storeTransaction, actionEntity, connectDetails);
} else if (actionDetails instanceof MoveDetails moveDetails) {
addMoveDetails(storeTransaction, actionEntity, moveDetails);
} else if (actionDetails instanceof ConfigureDetails configureDetails) {
addConfigureDetails(storeTransaction, actionEntity, configureDetails);
} else if (actionDetails instanceof PurgeDetails purgeDetails) {
addPurgeDetails(storeTransaction, actionEntity, purgeDetails);
}
}
private void addConnectDetails(final StoreTransaction storeTransaction, final Entity actionEntity, final ConnectDetails connectDetails) {
final Entity connectDetailsEntity = storeTransaction.newEntity(EntityType.CONNECT_DETAILS.getEntityType());
connectDetailsEntity.setLink(ConnectDetailsEntity.ACTION.getProperty(), actionEntity);
actionEntity.setLink(ActionLink.CONNECT_DETAILS.getProperty(), connectDetailsEntity);
connectDetailsEntity.setProperty(ConnectDetailsEntity.SOURCE_ID.getProperty(), connectDetails.getSourceId());
connectDetailsEntity.setProperty(ConnectDetailsEntity.SOURCE_NAME.getProperty(), connectDetails.getSourceName());
connectDetailsEntity.setProperty(ConnectDetailsEntity.SOURCE_TYPE.getProperty(), connectDetails.getSourceType().name());
connectDetailsEntity.setProperty(ConnectDetailsEntity.DESTINATION_ID.getProperty(), connectDetails.getDestinationId());
connectDetailsEntity.setProperty(ConnectDetailsEntity.DESTINATION_NAME.getProperty(), connectDetails.getDestinationName());
connectDetailsEntity.setProperty(ConnectDetailsEntity.DESTINATION_TYPE.getProperty(), connectDetails.getDestinationType().name());
connectDetailsEntity.setProperty(ConnectDetailsEntity.RELATIONSHIP.getProperty(), connectDetails.getRelationship());
}
private void addMoveDetails(final StoreTransaction storeTransaction, final Entity actionEntity, final MoveDetails moveDetails) {
final Entity moveDetailsEntity = storeTransaction.newEntity(EntityType.MOVE_DETAILS.getEntityType());
moveDetailsEntity.setLink(MoveDetailsEntity.ACTION.getProperty(), actionEntity);
actionEntity.setLink(ActionLink.MOVE_DETAILS.getProperty(), moveDetailsEntity);
moveDetailsEntity.setProperty(MoveDetailsEntity.GROUP.getProperty(), moveDetails.getGroup());
moveDetailsEntity.setProperty(MoveDetailsEntity.GROUP_ID.getProperty(), moveDetails.getGroupId());
moveDetailsEntity.setProperty(MoveDetailsEntity.PREVIOUS_GROUP.getProperty(), moveDetails.getPreviousGroup());
moveDetailsEntity.setProperty(MoveDetailsEntity.PREVIOUS_GROUP_ID.getProperty(), moveDetails.getPreviousGroupId());
}
private void addConfigureDetails(final StoreTransaction storeTransaction, final Entity actionEntity, final ConfigureDetails configureDetails) {
final Entity configureDetailsEntity = storeTransaction.newEntity(EntityType.CONFIGURE_DETAILS.getEntityType());
configureDetailsEntity.setLink(MoveDetailsEntity.ACTION.getProperty(), actionEntity);
actionEntity.setLink(ActionLink.CONFIGURE_DETAILS.getProperty(), configureDetailsEntity);
configureDetailsEntity.setProperty(ConfigureDetailsEntity.NAME.getProperty(), configureDetails.getName());
final String previousValue = configureDetails.getPreviousValue();
if (previousValue != null) {
configureDetailsEntity.setProperty(ConfigureDetailsEntity.PREVIOUS_VALUE.getProperty(), previousValue);
}
final String value = configureDetails.getValue();
if (value != null) {
configureDetailsEntity.setProperty(ConfigureDetailsEntity.VALUE.getProperty(), value);
}
}
private void addPurgeDetails(final StoreTransaction storeTransaction, final Entity actionEntity, final PurgeDetails purgeDetails) {
final Entity purgeDetailsEntity = storeTransaction.newEntity(EntityType.PURGE_DETAILS.getEntityType());
purgeDetailsEntity.setLink(PurgeDetailsEntity.ACTION.getProperty(), actionEntity);
actionEntity.setLink(ActionLink.PURGE_DETAILS.getProperty(), purgeDetailsEntity);
purgeDetailsEntity.setProperty(PurgeDetailsEntity.END_DATE.getProperty(), purgeDetails.getEndDate().getTime());
}
private Action readAction(final Entity entity) {
final FlowChangeAction action = new FlowChangeAction();
final EntityId entityId = entity.getId();
final int id = Math.toIntExact(entityId.getLocalId());
action.setId(id);
action.setUserIdentity(getProperty(entity, ActionEntity.USER_IDENTITY));
action.setSourceId(getProperty(entity, ActionEntity.SOURCE_ID));
action.setSourceName(getProperty(entity, ActionEntity.SOURCE_NAME));
action.setTimestamp(getDateProperty(entity, ActionEntity.TIMESTAMP));
action.setSourceType(getEnumProperty(entity, ActionEntity.SOURCE_TYPE, Component.class));
action.setOperation(getEnumProperty(entity, ActionEntity.OPERATION, Operation.class));
final String extensionType = getProperty(entity, ActionEntity.EXTENSION_TYPE);
if (extensionType != null) {
final FlowChangeExtensionDetails extensionDetails = new FlowChangeExtensionDetails();
extensionDetails.setType(extensionType);
action.setComponentDetails(extensionDetails);
}
final String remoteProgressGroupUri = getProperty(entity, ActionEntity.REMOTE_PROCESS_GROUP_URI);
if (remoteProgressGroupUri != null) {
final FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProgressGroupUri);
action.setComponentDetails(remoteProcessGroupDetails);
}
final Entity purgeDetailsEntity = entity.getLink(ActionLink.PURGE_DETAILS.getProperty());
if (purgeDetailsEntity != null) {
final FlowChangePurgeDetails purgeDetails = new FlowChangePurgeDetails();
purgeDetails.setEndDate(getDateProperty(purgeDetailsEntity, PurgeDetailsEntity.END_DATE));
action.setActionDetails(purgeDetails);
}
final Entity configureDetailsEntity = entity.getLink(ActionLink.CONFIGURE_DETAILS.getProperty());
if (configureDetailsEntity != null) {
final ConfigureDetails configureDetails = getConfigureDetails(configureDetailsEntity);
action.setActionDetails(configureDetails);
}
final Entity connectDetailsEntity = entity.getLink(ActionLink.CONNECT_DETAILS.getProperty());
if (connectDetailsEntity != null) {
final ConnectDetails connectDetails = getConnectDetails(connectDetailsEntity);
action.setActionDetails(connectDetails);
}
final Entity moveDetailsEntity = entity.getLink(ActionLink.MOVE_DETAILS.getProperty());
if (moveDetailsEntity != null) {
final MoveDetails moveDetails = getMoveDetails(moveDetailsEntity);
action.setActionDetails(moveDetails);
}
return action;
}
private ConfigureDetails getConfigureDetails(final Entity configureDetailsEntity) {
final FlowChangeConfigureDetails configureDetails = new FlowChangeConfigureDetails();
configureDetails.setName(getProperty(configureDetailsEntity, ConfigureDetailsEntity.NAME));
configureDetails.setPreviousValue(getProperty(configureDetailsEntity, ConfigureDetailsEntity.PREVIOUS_VALUE));
configureDetails.setValue(getProperty(configureDetailsEntity, ConfigureDetailsEntity.VALUE));
return configureDetails;
}
private ConnectDetails getConnectDetails(final Entity connectDetailsEntity) {
final FlowChangeConnectDetails connectDetails = new FlowChangeConnectDetails();
connectDetails.setSourceId(getProperty(connectDetailsEntity, ConnectDetailsEntity.SOURCE_ID));
connectDetails.setSourceName(getProperty(connectDetailsEntity, ConnectDetailsEntity.SOURCE_NAME));
connectDetails.setSourceType(getEnumProperty(connectDetailsEntity, ConnectDetailsEntity.SOURCE_TYPE, Component.class));
connectDetails.setDestinationId(getProperty(connectDetailsEntity, ConnectDetailsEntity.DESTINATION_ID));
connectDetails.setDestinationName(getProperty(connectDetailsEntity, ConnectDetailsEntity.DESTINATION_NAME));
connectDetails.setDestinationType(getEnumProperty(connectDetailsEntity, ConnectDetailsEntity.DESTINATION_TYPE, Component.class));
connectDetails.setRelationship(getProperty(connectDetailsEntity, ConnectDetailsEntity.RELATIONSHIP));
return connectDetails;
}
private MoveDetails getMoveDetails(final Entity moveDetailsEntity) {
final FlowChangeMoveDetails moveDetails = new FlowChangeMoveDetails();
moveDetails.setGroup(getProperty(moveDetailsEntity, MoveDetailsEntity.GROUP));
moveDetails.setGroupId(getProperty(moveDetailsEntity, MoveDetailsEntity.GROUP_ID));
moveDetails.setPreviousGroup(getProperty(moveDetailsEntity, MoveDetailsEntity.PREVIOUS_GROUP));
moveDetails.setPreviousGroupId(getProperty(moveDetailsEntity, MoveDetailsEntity.PREVIOUS_GROUP_ID));
return moveDetails;
}
private String getProperty(final Entity entity, final EntityProperty entityProperty) {
final Comparable<?> property = entity.getProperty(entityProperty.getProperty());
return property == null ? null : property.toString();
}
private <T extends Enum<T>> T getEnumProperty(final Entity entity, final EntityProperty entityProperty, final Class<T> enumType) {
final Comparable<?> property = entity.getProperty(entityProperty.getProperty());
return property == null ? null : Enum.valueOf(enumType, property.toString());
}
private Date getDateProperty(final Entity entity, final EntityProperty entityProperty) {
final Comparable<?> property = entity.getProperty(entityProperty.getProperty());
final Date dateProperty;
if (property instanceof Long) {
final long milliseconds = (Long) property;
dateProperty = new Date(milliseconds);
} else {
dateProperty = null;
}
return dateProperty;
}
private Environment loadEnvironment(final File directory) {
final EnvironmentConfig environmentConfig = new EnvironmentConfig();
Environment loadedEnvironment;
try {
loadedEnvironment = Environments.newInstance(directory, environmentConfig);
} catch (final Exception e) {
logger.warn("Environment loading failed with directory [{}]", directory, e);
try (Stream<Path> files = Files.list(directory.toPath())) {
final List<Path> environmentFiles = files.filter(Files::isRegularFile).toList();
final long now = System.currentTimeMillis();
for (final Path environmentFile : environmentFiles) {
final String backupFileName = String.format(BACKUP_FILE_NAME_FORMAT, environmentFile.getFileName().toString(), now);
final Path backupStoreFile = environmentFile.resolveSibling(backupFileName);
try {
Files.move(environmentFile, backupStoreFile, StandardCopyOption.REPLACE_EXISTING);
logger.warn("Moved Environment file [{}] to [{}]", environmentFile, backupStoreFile);
} catch (final IOException ioe) {
throw new UncheckedIOException(String.format("Environment file move failed [%s]", environmentFile), ioe);
}
}
} catch (final IOException ioe) {
throw new UncheckedIOException(String.format("Environment directory listing failed [%s]", directory), ioe);
}
// Retry loading after directory cleanup
loadedEnvironment = Environments.newInstance(directory, environmentConfig);
}
return loadedEnvironment;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import java.util.Collection;
/**
* Adds the specified actions.
*/
public class AddActionsAction implements AdministrationAction<Void> {
private final Collection<Action> actions;
public AddActionsAction(Collection<Action> actions) {
this.actions = actions;
}
@Override
public Void execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
// add each action
for (Action action : actions) {
actionDao.createAction(action);
}
return null;
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
/**
* Purges actions up to a specified end date.
*/
public class DeletePreviousValues implements AdministrationAction<Void> {
private final String propertyName;
private final String componentId;
public DeletePreviousValues(String propertyName, String componentId) {
this.propertyName = propertyName;
this.componentId = componentId;
}
@Override
public Void execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
actionDao.deletePreviousValues(propertyName, componentId);
return null;
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import java.util.Date;
/**
* Get all actions that match the specified query.
*/
public class GetActionsAction implements AdministrationAction<History> {
private final HistoryQuery query;
public GetActionsAction(HistoryQuery query) {
this.query = query;
}
@Override
public History execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
// find all matching history
History history = actionDao.findActions(query);
history.setLastRefreshed(new Date());
return history;
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import org.apache.nifi.history.PreviousValue;
import java.util.List;
import java.util.Map;
/**
* Gets the action with the specified id.
*/
public class GetPreviousValues implements AdministrationAction<Map<String, List<PreviousValue>>> {
private final String componentId;
public GetPreviousValues(String componentId) {
this.componentId = componentId;
}
@Override
public Map<String, List<PreviousValue>> execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
return actionDao.getPreviousValues(componentId);
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import java.util.Date;
/**
* Purges actions up to a specified end date.
*/
public class PurgeActionsAction implements AdministrationAction<Void> {
private final Date end;
private final Action purgeAction;
public PurgeActionsAction(Date end, Action purgeAction) {
this.end = end;
this.purgeAction = purgeAction;
}
@Override
public Void execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
// remove the corresponding actions
actionDao.deleteActions(end);
// create a purge action
actionDao.createAction(purgeAction);
return null;
}
}

View File

@ -14,27 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.transaction;
package org.apache.nifi.admin.service.entity;
/**
* Exception to indicate that the user account is disabled.
* Enumeration of Action properties stored as Entity objects
*/
public class TransactionException extends RuntimeException {
public enum ActionEntity implements EntityProperty {
TIMESTAMP("timestamp"),
public TransactionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
USER_IDENTITY("userIdentity"),
SOURCE_ID("sourceId"),
SOURCE_NAME("sourceName"),
SOURCE_TYPE("sourceType"),
OPERATION("operation"),
EXTENSION_TYPE("extensionType"),
REMOTE_PROCESS_GROUP_URI("remoteProcessGroupUri");
private final String property;
ActionEntity(final String property) {
this.property = property;
}
public TransactionException(Throwable cause) {
super(cause);
@Override
public String getProperty() {
return property;
}
public TransactionException(String message, Throwable cause) {
super(message, cause);
}
public TransactionException(String message) {
super(message);
}
}

View File

@ -14,27 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
package org.apache.nifi.admin.service.entity;
/**
* Gets the action with the specified id.
* Enumeration of Action link properties for referencing other Entity objects
*/
public class GetActionAction implements AdministrationAction<Action> {
public enum ActionLink implements EntityProperty {
CONNECT_DETAILS("connectDetails"),
private final Integer id;
MOVE_DETAILS("moveDetails"),
public GetActionAction(Integer id) {
this.id = id;
CONFIGURE_DETAILS("configureDetails"),
PURGE_DETAILS("purgeDetails");
private final String property;
ActionLink(final String property) {
this.property = property;
}
@Override
public Action execute(DAOFactory daoFactory) {
ActionDAO actionDao = daoFactory.getActionDAO();
return actionDao.getAction(id);
public String getProperty() {
return property;
}
}

View File

@ -14,34 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.user;
import org.apache.nifi.authorization.user.NiFiUser;
import java.util.Set;
package org.apache.nifi.admin.service.entity;
/**
*
* Enumeration of Action Configure Details properties stored as Entity objects
*/
public class NiFiUserGroup {
public enum ConfigureDetailsEntity implements EntityProperty {
ACTION("action"),
private String group;
private Set<NiFiUser> users;
NAME("name"),
public String getGroup() {
return group;
PREVIOUS_VALUE("previousValue"),
VALUE("value");
private final String property;
ConfigureDetailsEntity(final String property) {
this.property = property;
}
public void setGroup(String group) {
this.group = group;
@Override
public String getProperty() {
return property;
}
public Set<NiFiUser> getUsers() {
return users;
}
public void setUsers(Set<NiFiUser> users) {
this.users = users;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.entity;
/**
* Enumeration of Action Connect Details properties stored as Entity objects
*/
public enum ConnectDetailsEntity implements EntityProperty {
ACTION("action"),
SOURCE_ID("sourceId"),
SOURCE_NAME("sourceName"),
SOURCE_TYPE("sourceType"),
DESTINATION_ID("destinationId"),
DESTINATION_NAME("destinationName"),
DESTINATION_TYPE("destinationType"),
RELATIONSHIP("relationship");
private final String property;
ConnectDetailsEntity(final String property) {
this.property = property;
}
@Override
public String getProperty() {
return property;
}
}

View File

@ -14,12 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.transaction;
package org.apache.nifi.admin.service.entity;
/**
*
* Abstraction for stored properties of persistent Entities
*/
public interface TransactionBuilder {
Transaction start() throws TransactionException;
public interface EntityProperty {
/**
* Get property name for an element of an Entity
*
* @return Property name
*/
String getProperty();
}

View File

@ -14,23 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.action;
import org.apache.nifi.admin.dao.DAOFactory;
package org.apache.nifi.admin.service.entity;
/**
* Defines the administration action. Actions are provided a DAO factory and
* authority provider to perform a require action.
*
* @param <T> type
* Enumeration of Persistent Entity Types
*/
public interface AdministrationAction<T> {
public enum EntityType {
ACTION("Action"),
/**
* Performs an action using the specified DAOFactory and AuthorityProvider.
*
* @param daoFactory factory
* @return action result
*/
T execute(DAOFactory daoFactory);
CONFIGURE_DETAILS("ConfigureDetails"),
CONNECT_DETAILS("ConnectDetails"),
EXTENSION_DETAILS("ExtensionDetails"),
MOVE_DETAILS("MoveDetails"),
PURGE_DETAILS("PurgeDetails");
private final String entityType;
EntityType(final String entityType) {
this.entityType = entityType;
}
public String getEntityType() {
return entityType;
}
}

View File

@ -14,22 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.dao;
package org.apache.nifi.admin.service.entity;
/**
* Represents any error that might occur while administering NiFi accounts.
* Enumeration of Extension Details properties stored as Entity objects
*/
public class DataAccessException extends RuntimeException {
public enum ExtensionDetailsEntity implements EntityProperty {
EXTENSION_TYPE("extensionType");
public DataAccessException(Throwable cause) {
super(cause);
private final String property;
ExtensionDetailsEntity(final String property) {
this.property = property;
}
public DataAccessException(String message, Throwable cause) {
super(message, cause);
}
public DataAccessException(String message) {
super(message);
@Override
public String getProperty() {
return property;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.entity;
/**
* Enumeration of Action Move Details properties stored as Entity objects
*/
public enum MoveDetailsEntity implements EntityProperty {
ACTION("action"),
GROUP("group"),
GROUP_ID("groupId"),
PREVIOUS_GROUP("previousGroup"),
PREVIOUS_GROUP_ID("previousGroupId");
private final String property;
MoveDetailsEntity(final String property) {
this.property = property;
}
@Override
public String getProperty() {
return property;
}
}

View File

@ -14,26 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.dao.impl;
import org.apache.nifi.admin.dao.ActionDAO;
import org.apache.nifi.admin.dao.DAOFactory;
import java.sql.Connection;
package org.apache.nifi.admin.service.entity;
/**
*
* Enumeration of Action Purge Details properties stored as Entity objects
*/
public class DAOFactoryImpl implements DAOFactory {
public enum PurgeDetailsEntity implements EntityProperty {
ACTION("action"),
private final Connection connection;
END_DATE("endDate");
public DAOFactoryImpl(Connection connection) {
this.connection = connection;
private final String property;
PurgeDetailsEntity(final String property) {
this.property = property;
}
@Override
public ActionDAO getActionDAO() {
return new StandardActionDAO(connection);
public String getProperty() {
return property;
}
}

View File

@ -1,255 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.impl;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.dao.DataAccessException;
import org.apache.nifi.admin.service.AdministrationException;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.action.AddActionsAction;
import org.apache.nifi.admin.service.action.DeletePreviousValues;
import org.apache.nifi.admin.service.action.GetActionAction;
import org.apache.nifi.admin.service.action.GetActionsAction;
import org.apache.nifi.admin.service.action.GetPreviousValues;
import org.apache.nifi.admin.service.action.PurgeActionsAction;
import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionBuilder;
import org.apache.nifi.admin.service.transaction.TransactionException;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
*/
public class StandardAuditService implements AuditService {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private TransactionBuilder transactionBuilder;
@Override
public void addActions(Collection<Action> actions) {
Transaction transaction = null;
writeLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// seed the accounts
AddActionsAction addActions = new AddActionsAction(actions);
transaction.execute(addActions);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
writeLock.unlock();
}
}
@Override
public Map<String, List<PreviousValue>> getPreviousValues(String componentId) {
Transaction transaction = null;
Map<String, List<PreviousValue>> previousValues = null;
readLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// seed the accounts
GetPreviousValues getActions = new GetPreviousValues(componentId);
previousValues = transaction.execute(getActions);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
readLock.unlock();
}
return previousValues;
}
@Override
public void deletePreviousValues(String propertyName, String componentId) {
Transaction transaction = null;
readLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// seed the accounts
DeletePreviousValues deleteAction = new DeletePreviousValues(propertyName, componentId);
transaction.execute(deleteAction);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
readLock.unlock();
}
}
@Override
public History getActions(HistoryQuery query) {
Transaction transaction = null;
History history = null;
readLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// seed the accounts
GetActionsAction getActions = new GetActionsAction(query);
history = transaction.execute(getActions);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
readLock.unlock();
}
return history;
}
@Override
public History getActions(int firstActionId, int maxActions) {
final HistoryQuery query = new HistoryQuery();
query.setOffset(firstActionId);
query.setCount(maxActions);
query.setSortOrder("asc");
query.setSortColumn("timestamp");
return getActions(query);
}
@Override
public Action getAction(Integer actionId) {
Transaction transaction = null;
Action action = null;
readLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// seed the accounts
GetActionAction getAction = new GetActionAction(actionId);
action = transaction.execute(getAction);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
readLock.unlock();
}
return action;
}
@Override
public void purgeActions(Date end, Action purgeAction) {
Transaction transaction = null;
writeLock.lock();
try {
// start the transaction
transaction = transactionBuilder.start();
// purge the action database
PurgeActionsAction purgeActions = new PurgeActionsAction(end, purgeAction);
transaction.execute(purgeActions);
// commit the transaction
transaction.commit();
} catch (TransactionException | DataAccessException te) {
rollback(transaction);
throw new AdministrationException(te);
} catch (Throwable t) {
rollback(transaction);
throw t;
} finally {
closeQuietly(transaction);
writeLock.unlock();
}
}
private void rollback(Transaction transaction) {
if (transaction != null) {
transaction.rollback();
}
}
private void closeQuietly(final Transaction transaction) {
if (transaction != null) {
try {
transaction.close();
} catch (final IOException ioe) {
}
}
}
public void setTransactionBuilder(TransactionBuilder transactionBuilder) {
this.transactionBuilder = transactionBuilder;
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.transaction;
import java.io.Closeable;
import org.apache.nifi.admin.service.action.AdministrationAction;
/**
* Defines a transaction.
*/
public interface Transaction extends Closeable {
/**
* Executes the specified action within the current transaction.
*
* @param <T> type of action to execute
* @param action action to execute
* @return executed action
* @throws IllegalStateException - if there is no current transaction
*/
<T> T execute(AdministrationAction<T> action);
/**
* Commits the current transaction.
*
* @throws TransactionException - if the transaction is unable to be
* committed
*/
void commit() throws TransactionException;
/**
* Rolls back the current transaction.
*/
void rollback();
}

View File

@ -1,91 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.transaction.impl;
import org.apache.nifi.admin.RepositoryUtils;
import org.apache.nifi.admin.dao.DAOFactory;
import org.apache.nifi.admin.dao.impl.DAOFactoryImpl;
import org.apache.nifi.admin.service.action.AdministrationAction;
import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
/**
* Transaction implementation that uses the specified SQL Connection and
* AuthorityProvider.
*/
public class StandardTransaction implements Transaction {
private static final Logger logger = LoggerFactory.getLogger(StandardTransaction.class);
private Connection connection;
public StandardTransaction(Connection connection) {
this.connection = connection;
}
@Override
public <T> T execute(AdministrationAction<T> action) {
// ensure the transaction has been started
if (connection == null) {
throw new IllegalStateException("This transaction is not active.");
}
// create a dao factory
DAOFactory daoFactory = new DAOFactoryImpl(connection);
// execute the specified action
return action.execute(daoFactory);
}
@Override
public void commit() throws TransactionException {
// ensure there is an active transaction
if (connection == null) {
throw new IllegalStateException("No active transaction.");
}
try {
// commit the transaction
connection.commit();
} catch (SQLException sqle) {
throw new TransactionException(sqle.getMessage());
}
}
@Override
public void rollback() {
// ensure there is an active transaction
if (connection != null) {
// rollback the transaction
RepositoryUtils.rollback(connection, logger);
}
}
@Override
public void close() throws IOException {
if (connection != null) {
RepositoryUtils.closeQuietly(connection);
connection = null;
}
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service.transaction.impl;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource;
import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionBuilder;
import org.apache.nifi.admin.service.transaction.TransactionException;
/**
*
*/
public class StandardTransactionBuilder implements TransactionBuilder {
private DataSource dataSource;
@Override
public Transaction start() throws TransactionException {
try {
// get a new connection
Connection connection = dataSource.getConnection();
final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
throw new TransactionException("setAutoCommit(false) not supported by this driver");
}
}
// create a new transaction
return new StandardTransaction(connection);
} catch (SQLException sqle) {
throw new TransactionException(sqle.getMessage());
}
}
/* setters */
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
}

View File

@ -1,36 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans default-lazy-init="true"
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!-- initialize the audit data source -->
<bean id="auditDataSource" class="org.apache.nifi.admin.AuditDataSourceFactoryBean" destroy-method="shutdown">
<property name="properties" ref="nifiProperties"/>
</bean>
<!-- initialize the audit transaction builder -->
<bean id="auditTransactionBuilder" class="org.apache.nifi.admin.service.transaction.impl.StandardTransactionBuilder">
<property name="dataSource" ref="auditDataSource"/>
</bean>
<!-- audit service -->
<bean id="auditService" class="org.apache.nifi.admin.service.impl.StandardAuditService">
<property name="transactionBuilder" ref="auditTransactionBuilder"/>
</bean>
</beans>

View File

@ -0,0 +1,369 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.admin.service;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
import org.apache.nifi.action.details.ActionDetails;
import org.apache.nifi.action.details.FlowChangeConfigureDetails;
import org.apache.nifi.action.details.FlowChangePurgeDetails;
import org.apache.nifi.action.details.PurgeDetails;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class EntityStoreAuditServiceTest {
private static final Date ACTION_TIMESTAMP = new Date(86400);
private static final Date SECOND_ACTION_TIMESTAMP = new Date(97500);
private static final Date PURGE_END_DATE = new Date(43200);
private static final String USER_IDENTITY = "admin";
private static final String SOURCE_ID = "00000000-0000-0000-0000-000000000000";
private static final String SECOND_SOURCE_ID = "01010101-0000-0000-0000-000000000000";
private static final String SOURCE_NAME = "GenerateFlowFile";
private static final Component SOURCE_TYPE = Component.Processor;
private static final Operation OPERATION = Operation.Add;
private static final int ACTION_ID = 0;
private static final int PURGE_ACTIONS = 1;
private static final String FIRST_PROPERTY_NAME = "FirstProperty";
private static final String SECOND_PROPERTY_NAME = "SecondProperty";
private static final String FIRST_VALUE = "FirstValue";
private static final String SECOND_VALUE = "SecondValue";
private static final String DATABASE_FILE_EXTENSION = ".xd";
@TempDir
File directory;
private EntityStoreAuditService service;
@BeforeEach
void setService() {
service = new EntityStoreAuditService(directory);
}
@AfterEach
void closeService() throws IOException {
service.close();
}
@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Moving the database lock file on Windows causes exceptions")
@Test
void testCorruptedFilesHandling() throws IOException {
service.close();
// Write invalid string
try (Stream<Path> files = Files.list(directory.toPath()).filter(file -> file.toString().endsWith(DATABASE_FILE_EXTENSION))) {
final Optional<Path> databaseFileFound = files.findFirst();
assertTrue(databaseFileFound.isPresent());
final Path databaseFile = databaseFileFound.get();
Files.writeString(databaseFile, SOURCE_ID);
}
// Create Service with corrupted directory
service = new EntityStoreAuditService(directory);
final Action action = newAction();
final Collection<Action> actions = Collections.singletonList(action);
service.addActions(actions);
}
@Test
void testGetActionNotFound() {
final Action action = service.getAction(ACTION_ID);
assertNull(action);
}
@Test
void testAddActionsGetAction() {
final Action action = newAction();
final Collection<Action> actions = Collections.singletonList(action);
service.addActions(actions);
final Action actionFound = service.getAction(ACTION_ID);
assertEquals(ACTION_ID, actionFound.getId());
assertActionFound(actionFound);
}
@Test
void testAddActionsGetActions() {
final Action firstAction = newAction();
final Action secondAction = newAction();
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
final History actionsHistory = service.getActions(ACTION_ID, actions.size());
assertNotNull(actionsHistory);
assertEquals(actions.size(), actionsHistory.getTotal());
assertNotNull(actionsHistory.getLastRefreshed());
final Collection<Action> actionsFound = actionsHistory.getActions();
assertNotNull(actionsFound);
for (final Action actionFound : actionsFound) {
assertActionFound(actionFound);
}
}
@Test
void testAddActionsGetActionsQueryUnfiltered() {
final Action firstAction = newAction();
final Action secondAction = newAction();
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
final HistoryQuery historyQuery = new HistoryQuery();
final History actionsHistory = service.getActions(historyQuery);
assertNotNull(actionsHistory);
assertEquals(actions.size(), actionsHistory.getTotal());
assertNotNull(actionsHistory.getLastRefreshed());
final Collection<Action> actionsFound = actionsHistory.getActions();
assertNotNull(actionsFound);
for (final Action actionFound : actionsFound) {
assertActionFound(actionFound);
}
}
@Test
void testAddActionsGetActionsQuerySourceNotFound() {
final Action firstAction = newAction();
final Action secondAction = newAction();
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
final HistoryQuery historyQuery = new HistoryQuery();
historyQuery.setSourceId(SECOND_SOURCE_ID);
final History actionsHistory = service.getActions(historyQuery);
assertNotNull(actionsHistory);
assertEquals(0, actionsHistory.getTotal());
assertNotNull(actionsHistory.getLastRefreshed());
final Collection<Action> actionsFound = actionsHistory.getActions();
assertNotNull(actionsFound);
assertTrue(actionsFound.isEmpty());
}
@Test
void testAddActionsGetActionsQueryStartDateFiltered() {
final FlowChangeAction firstAction = newAction();
final FlowChangeAction secondAction = newAction();
secondAction.setTimestamp(SECOND_ACTION_TIMESTAMP);
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
final HistoryQuery historyQuery = new HistoryQuery();
historyQuery.setStartDate(SECOND_ACTION_TIMESTAMP);
final History actionsHistory = service.getActions(historyQuery);
assertNotNull(actionsHistory);
assertEquals(actionsHistory.getTotal(), 1);
assertNotNull(actionsHistory.getLastRefreshed());
final Collection<Action> actionsFound = actionsHistory.getActions();
assertNotNull(actionsFound);
final Iterator<Action> actionsFiltered = actionsFound.iterator();
assertTrue(actionsFiltered.hasNext());
final Action firstActionFound = actionsFiltered.next();
assertEquals(SECOND_ACTION_TIMESTAMP, firstActionFound.getTimestamp());
assertFalse(actionsFiltered.hasNext());
}
@Test
void testAddActionsPurgeActionsGetAction() {
final Action action = newAction();
final Collection<Action> actions = Collections.singletonList(action);
service.addActions(actions);
final FlowChangeAction purgeAction = newAction();
purgeAction.setOperation(Operation.Purge);
final FlowChangePurgeDetails purgeDetails = new FlowChangePurgeDetails();
purgeDetails.setEndDate(PURGE_END_DATE);
purgeAction.setActionDetails(purgeDetails);
service.purgeActions(new Date(), purgeAction);
final History history = service.getActions(ACTION_ID, PURGE_ACTIONS);
assertNotNull(history);
assertEquals(PURGE_ACTIONS, history.getTotal());
final Iterator<Action> actionsFound = history.getActions().iterator();
assertTrue(actionsFound.hasNext());
final Action actionFound = actionsFound.next();
assertEquals(Operation.Purge, actionFound.getOperation());
final ActionDetails actionDetails = actionFound.getActionDetails();
assertInstanceOf(PurgeDetails.class, actionDetails);
final PurgeDetails purgeDetailsFound = (PurgeDetails) actionDetails;
assertEquals(PURGE_END_DATE, purgeDetailsFound.getEndDate());
}
@Test
void testAddActionsDeletePreviousValuesGetActions() {
final FlowChangeAction firstAction = newAction();
firstAction.setOperation(Operation.Configure);
final FlowChangeConfigureDetails firstConfigureDetails = new FlowChangeConfigureDetails();
firstConfigureDetails.setName(FIRST_PROPERTY_NAME);
firstConfigureDetails.setValue(FIRST_VALUE);
firstAction.setActionDetails(firstConfigureDetails);
final FlowChangeAction secondAction = newAction();
secondAction.setOperation(Operation.Configure);
final FlowChangeConfigureDetails secondConfigureDetails = new FlowChangeConfigureDetails();
secondConfigureDetails.setName(SECOND_PROPERTY_NAME);
secondConfigureDetails.setValue(SECOND_VALUE);
secondAction.setActionDetails(secondConfigureDetails);
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
service.deletePreviousValues(SECOND_PROPERTY_NAME, SOURCE_ID);
final History actionsHistory = service.getActions(ACTION_ID, Integer.MAX_VALUE);
assertNotNull(actionsHistory);
assertEquals(actions.size(), actionsHistory.getTotal());
final Iterator<Action> actionsFound = actionsHistory.getActions().iterator();
assertTrue(actionsFound.hasNext());
final Action firstActionFound = actionsFound.next();
assertNotNull(firstActionFound.getActionDetails());
assertTrue(actionsFound.hasNext());
final Action secondActionFound = actionsFound.next();
assertNull(secondActionFound.getActionDetails());
}
@Test
void testAddActionsGetPreviousValues() {
final FlowChangeAction firstAction = newAction();
firstAction.setOperation(Operation.Configure);
final FlowChangeConfigureDetails firstConfigureDetails = new FlowChangeConfigureDetails();
firstConfigureDetails.setName(FIRST_PROPERTY_NAME);
firstConfigureDetails.setValue(FIRST_VALUE);
firstAction.setActionDetails(firstConfigureDetails);
final FlowChangeAction secondAction = newAction();
secondAction.setOperation(Operation.Configure);
final FlowChangeConfigureDetails secondConfigureDetails = new FlowChangeConfigureDetails();
secondConfigureDetails.setName(SECOND_PROPERTY_NAME);
secondConfigureDetails.setValue(SECOND_VALUE);
secondAction.setActionDetails(secondConfigureDetails);
final Collection<Action> actions = Arrays.asList(firstAction, secondAction);
service.addActions(actions);
final Map<String, List<PreviousValue>> previousValues = service.getPreviousValues(SOURCE_ID);
assertNotNull(previousValues);
assertFalse(previousValues.isEmpty());
final List<PreviousValue> firstPreviousValues = previousValues.get(FIRST_PROPERTY_NAME);
assertNotNull(firstPreviousValues);
final PreviousValue firstPreviousValue = firstPreviousValues.get(0);
assertNotNull(firstPreviousValue);
assertEquals(FIRST_VALUE, firstPreviousValue.getPreviousValue());
assertNotNull(firstPreviousValue.getTimestamp());
assertEquals(USER_IDENTITY, firstPreviousValue.getUserIdentity());
final List<PreviousValue> secondPreviousValues = previousValues.get(SECOND_PROPERTY_NAME);
assertNotNull(secondPreviousValues);
final PreviousValue secondPreviousValue = secondPreviousValues.get(0);
assertNotNull(secondPreviousValue);
assertEquals(SECOND_VALUE, secondPreviousValue.getPreviousValue());
assertNotNull(secondPreviousValue.getTimestamp());
assertEquals(USER_IDENTITY, secondPreviousValue.getUserIdentity());
}
private FlowChangeAction newAction() {
final FlowChangeAction action = new FlowChangeAction();
action.setTimestamp(ACTION_TIMESTAMP);
action.setSourceId(SOURCE_ID);
action.setSourceName(SOURCE_NAME);
action.setSourceType(SOURCE_TYPE);
action.setUserIdentity(USER_IDENTITY);
action.setOperation(OPERATION);
return action;
}
private void assertActionFound(final Action actionFound) {
assertNotNull(actionFound);
assertEquals(ACTION_TIMESTAMP, actionFound.getTimestamp());
assertEquals(SOURCE_ID, actionFound.getSourceId());
assertEquals(SOURCE_NAME, actionFound.getSourceName());
assertEquals(USER_IDENTITY, actionFound.getUserIdentity());
assertEquals(OPERATION, actionFound.getOperation());
}
}

View File

@ -41,9 +41,8 @@ nifi.state.management.embedded.zookeeper.start=false
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.VolatileFlowFileRepository

View File

@ -28,9 +28,8 @@ nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -151,10 +151,6 @@
<artifactId>nifi-stateless-engine</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
@ -179,6 +175,14 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>

View File

@ -28,9 +28,8 @@ nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.working.directory=./target/flowcontrollertest/work/nar/
nifi.state.management.configuration.file=src/test/resources/state-management.xml
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./target/flowcontrollertest/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/flowcontrollertest/test-repo

View File

@ -28,9 +28,8 @@ nifi.ui.autorefresh.interval=30 sec
nifi.nar.library.directory=./target/lib
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.working.directory=./target/standardflowserializertest/work/nar/
nifi.state.management.configuration.file=src/test/resources/state-management.xml
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./target/standardflowserializertest/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/standardflowserializertest/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.working.directory=./target/standardflowsynchronizerspec/work/nar/
nifi.state.management.configuration.file=src/test/resources/state-management.xml
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./target/standardflowsynchronizerspec/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/standardflowsynchronizerspec/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.working.directory=./target/standardprocessschedulertest/work/nar/
nifi.state.management.configuration.file=src/test/resources/state-management.xml
nifi.state.management.provider.local=local-provider
# H2 Settings
# Database Settings
nifi.database.directory=./target/standardprocessschedulertest/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/standardprocessschedulertest/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.library.autoload.directory=./target/nars_with_native_lib
nifi.nar.working.directory=./target/work/nar/
nifi.documentation.working.directory=./target/work/docs/components
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.library.autoload.directory=./target/nars_without_native_lib
nifi.nar.working.directory=./target/work/nar/
nifi.documentation.working.directory=./target/work/docs/components
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.library.autoload.directory=./target/extensions
nifi.nar.working.directory=./target/work/nar/
nifi.documentation.working.directory=./target/work/docs/components
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -31,9 +31,8 @@ nifi.nar.library.directory.alt=./target/NarUnpacker/lib2/
nifi.nar.working.directory=./target/work/nar/
nifi.documentation.working.directory=./target/work/docs/components
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -48,9 +48,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.headless;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.history.PreviousValue;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Headless Audit Service implementation does not persist actions
*/
public class HeadlessAuditService implements AuditService {
@Override
public void addActions(Collection<Action> actions) {
}
@Override
public Map<String, List<PreviousValue>> getPreviousValues(String componentId) {
return null;
}
@Override
public void deletePreviousValues(String propertyName, String componentId) {
}
@Override
public History getActions(HistoryQuery actionQuery) {
return null;
}
@Override
public History getActions(int firstActionId, int maxActions) {
return null;
}
@Override
public Action getAction(Integer actionId) {
return null;
}
@Override
public void purgeActions(Date end, Action purgeAction) {
}
}

View File

@ -18,7 +18,6 @@ package org.apache.nifi.headless;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.impl.StandardAuditService;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.Authorizer;
@ -101,7 +100,7 @@ public class HeadlessNiFiServer implements NiFiServer {
logger.info("Loading Flow...");
FlowFileEventRepository flowFileEventRepository = new RingBufferEventRepository(5);
AuditService auditService = new StandardAuditService();
AuditService auditService = new HeadlessAuditService();
Authorizer authorizer = new Authorizer() {
@Override
public AuthorizationResult authorize(AuthorizationRequest request) throws AuthorizationAccessException {

View File

@ -48,9 +48,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -84,8 +84,6 @@
<nifi.sensitive.props.algorithm>NIFI_PBKDF2_AES_GCM_256</nifi.sensitive.props.algorithm>
<nifi.sensitive.props.additional.keys />
<nifi.h2.url.append>;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE</nifi.h2.url.append>
<nifi.remote.input.socket.port>9990</nifi.remote.input.socket.port>
<nifi.remote.input.secure>true</nifi.remote.input.secure>

View File

@ -192,6 +192,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>
<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>

View File

@ -66,9 +66,8 @@ nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
nifi.state.management.embedded.zookeeper.properties=${nifi.state.management.embedded.zookeeper.properties}
# H2 Settings
# Database Settings
nifi.database.directory=${nifi.database.directory}
nifi.h2.url.append=${nifi.h2.url.append}
# Repository Encryption properties override individual repository implementation properties
nifi.repository.encryption.protocol.version=

View File

@ -49,9 +49,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./target/conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -48,9 +48,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./target/conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -33,10 +33,6 @@ nifi.flowcontroller.graceful.shutdown.seconds=10
nifi.nar.library.directory=./lib
nifi.nar.working.directory=./work/nar/
nifi.flowservice.writedelay.seconds=2
nifi.h2.repository.maxmemoryrows=100000
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
nifi.h2.max.connections=20
nifi.h2.login.timeout=500
#For testing purposes. Default value should actually be empty!
nifi.remote.input.socket.port=5000
nifi.remote.input.secure=true

View File

@ -16,11 +16,18 @@
*/
package org.apache.nifi.web;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.admin.service.EntityStoreAuditService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.configuration.WebSecurityConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportResource;
import java.io.File;
/**
* Web Application Spring Configuration
*/
@ -29,7 +36,6 @@ import org.springframework.context.annotation.ImportResource;
WebSecurityConfiguration.class
})
@ImportResource({"classpath:nifi-context.xml",
"classpath:nifi-administration-context.xml",
"classpath:nifi-authorizer-context.xml",
"classpath:nifi-cluster-manager-context.xml",
"classpath:nifi-cluster-protocol-context.xml",
@ -40,4 +46,16 @@ public class NiFiWebApiConfiguration {
super();
}
/**
* Audit Service implementation from nifi-administration
*
* @param properties NiFi Properties
* @return Audit Service implementation using Persistent Entity Store
*/
@Autowired
@Bean
public AuditService auditService(final NiFiProperties properties) {
final File databaseDirectory = properties.getDatabaseRepositoryPath().toFile();
return new EntityStoreAuditService(databaseDirectory);
}
}

View File

@ -34,9 +34,8 @@ nifi.state.management.embedded.zookeeper.max.instances=3
nifi.state.management.provider.local=local-provider
nifi.state.management.provider.cluster=
# H2 Settings
# Database Settings
nifi.database.directory=target/test-classes/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.provenance.repository.implementation=org.apache.nifi.provenance.VolatileProvenanceRepository

View File

@ -34,9 +34,8 @@ nifi.state.management.embedded.zookeeper.max.instances=3
nifi.state.management.provider.local=local-provider
nifi.state.management.provider.cluster=
# H2 Settings
# Database Settings
nifi.database.directory=target/test-classes/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.provenance.repository.implementation=org.apache.nifi.provenance.VolatileProvenanceRepository

View File

@ -40,9 +40,8 @@ nifi.state.management.embedded.zookeeper.max.instances=3
nifi.state.management.provider.local=local-provider
nifi.state.management.provider.cluster=
# H2 Settings
# Database Settings
nifi.database.directory=target/test-classes/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.provenance.repository.implementation=org.apache.nifi.provenance.VolatileProvenanceRepository

View File

@ -34,9 +34,8 @@ nifi.state.management.embedded.zookeeper.max.instances=3
nifi.state.management.provider.local=local-provider
nifi.state.management.provider.cluster=
# H2 Settings
# Database Settings
nifi.database.directory=target/test-classes/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.provenance.repository.implementation=org.apache.nifi.provenance.VolatileProvenanceRepository

View File

@ -45,9 +45,8 @@ nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.
nifi.state.management.embedded.zookeeper.properties=${nifi.state.management.embedded.zookeeper.properties}
# H2 Settings
# Database Settings
nifi.database.directory=${nifi.database.directory}
nifi.h2.url.append=${nifi.h2.url.append}
# FlowFile Repository
nifi.flowfile.repository.implementation=${nifi.flowfile.repository.implementation}

View File

@ -260,11 +260,6 @@
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<!-- open id connect - override transitive dependency version ranges -->
<dependency>

View File

@ -64,9 +64,8 @@ nifi.state.management.embedded.zookeeper.start=true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -64,9 +64,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -64,9 +64,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -68,9 +68,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -30,9 +30,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -29,9 +29,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -30,9 +30,8 @@ nifi.nar.library.directory=./target/resources/NiFiProperties/lib/
nifi.nar.library.directory.alt=./target/resources/NiFiProperties/lib2/
nifi.nar.working.directory=./target/work/nar/
# H2 Settings
# Database Settings
nifi.database.directory=./target/database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.directory=./target/test-repo

View File

@ -48,9 +48,8 @@ nifi.state.management.embedded.zookeeper.start=false
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
# H2 Settings
# Database Settings
nifi.database.directory=./database_repository
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
# FlowFile Repository
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository