mirror of https://github.com/apache/nifi.git
NIFI-2145: Auto flow.xml archive
- Added following properties: - nifi.flow.configuration.archive.enabled - nifi.flow.configuration.archive.max.time - nifi.flow.configuration.archive.max.storage - Removed manual archive operation: - Removed 'Back-up flow' link from UI since it's not needed any longer - Removed corresponding REST API controller/archive and its implementations - Added FlowConfigurationArchiveManager to enclose archive related code - Updated related docs
This commit is contained in:
parent
cd1bccef8c
commit
30889995cb
|
@ -368,7 +368,10 @@ language governing permissions and limitations under the License. -->
|
||||||
<nifi.bored.yield.duration>10 millis</nifi.bored.yield.duration>
|
<nifi.bored.yield.duration>10 millis</nifi.bored.yield.duration>
|
||||||
|
|
||||||
<nifi.flow.configuration.file>./conf/flow.xml.gz</nifi.flow.configuration.file>
|
<nifi.flow.configuration.file>./conf/flow.xml.gz</nifi.flow.configuration.file>
|
||||||
|
<nifi.flow.configuration.archive.enabled>true</nifi.flow.configuration.archive.enabled>
|
||||||
<nifi.flow.configuration.archive.dir>./conf/archive/</nifi.flow.configuration.archive.dir>
|
<nifi.flow.configuration.archive.dir>./conf/archive/</nifi.flow.configuration.archive.dir>
|
||||||
|
<nifi.flow.configuration.archive.max.time>30 days</nifi.flow.configuration.archive.max.time>
|
||||||
|
<nifi.flow.configuration.archive.max.storage>500 MB</nifi.flow.configuration.archive.max.storage>
|
||||||
<nifi.login.identity.provider.configuration.file>./conf/login-identity-providers.xml</nifi.login.identity.provider.configuration.file>
|
<nifi.login.identity.provider.configuration.file>./conf/login-identity-providers.xml</nifi.login.identity.provider.configuration.file>
|
||||||
<nifi.authorizer.configuration.file>./conf/authorizers.xml</nifi.authorizer.configuration.file>
|
<nifi.authorizer.configuration.file>./conf/authorizers.xml</nifi.authorizer.configuration.file>
|
||||||
<nifi.templates.directory>./conf/templates</nifi.templates.directory>
|
<nifi.templates.directory>./conf/templates</nifi.templates.directory>
|
||||||
|
|
|
@ -43,7 +43,10 @@ public class NiFiProperties extends Properties {
|
||||||
// core properties
|
// core properties
|
||||||
public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path";
|
public static final String PROPERTIES_FILE_PATH = "nifi.properties.file.path";
|
||||||
public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file";
|
public static final String FLOW_CONFIGURATION_FILE = "nifi.flow.configuration.file";
|
||||||
public static final String FLOW_CONFIGURATION_ARCHIVE_FILE = "nifi.flow.configuration.archive.file";
|
public static final String FLOW_CONFIGURATION_ARCHIVE_ENABLED = "nifi.flow.configuration.archive.enabled";
|
||||||
|
public static final String FLOW_CONFIGURATION_ARCHIVE_DIR = "nifi.flow.configuration.archive.dir";
|
||||||
|
public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "nifi.flow.configuration.archive.max.time";
|
||||||
|
public static final String FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "nifi.flow.configuration.archive.max.storage";
|
||||||
public static final String AUTHORIZER_CONFIGURATION_FILE = "nifi.authorizer.configuration.file";
|
public static final String AUTHORIZER_CONFIGURATION_FILE = "nifi.authorizer.configuration.file";
|
||||||
public static final String LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "nifi.login.identity.provider.configuration.file";
|
public static final String LOGIN_IDENTITY_PROVIDER_CONFIGURATION_FILE = "nifi.login.identity.provider.configuration.file";
|
||||||
public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory";
|
public static final String REPOSITORY_DATABASE_DIRECTORY = "nifi.database.directory";
|
||||||
|
@ -215,6 +218,9 @@ public class NiFiProperties extends Properties {
|
||||||
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
|
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
|
||||||
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
|
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
|
||||||
public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 secs";
|
public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 secs";
|
||||||
|
public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED = "true";
|
||||||
|
public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME = "30 days";
|
||||||
|
public static final String DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE = "500 MB";
|
||||||
|
|
||||||
// cluster common defaults
|
// cluster common defaults
|
||||||
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
|
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
|
||||||
|
@ -941,4 +947,20 @@ public class NiFiProperties extends Properties {
|
||||||
public boolean isStartEmbeddedZooKeeper() {
|
public boolean isStartEmbeddedZooKeeper() {
|
||||||
return Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER));
|
return Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isFlowConfigurationArchiveEnabled() {
|
||||||
|
return Boolean.parseBoolean(getProperty(FLOW_CONFIGURATION_ARCHIVE_ENABLED, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_ENABLED));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFlowConfigurationArchiveDir() {
|
||||||
|
return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFlowConfigurationArchiveMaxTime() {
|
||||||
|
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFlowConfigurationArchiveMaxStorage() {
|
||||||
|
return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE, DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_STORAGE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1144,7 +1144,10 @@ The first section of the _nifi.properties_ file is for the Core Properties. Thes
|
||||||
|*Property*|*Description*
|
|*Property*|*Description*
|
||||||
|nifi.version|The version number of the current release. If upgrading but reusing this file, be sure to update this value.
|
|nifi.version|The version number of the current release. If upgrading but reusing this file, be sure to update this value.
|
||||||
|nifi.flow.configuration.file*|The location of the flow configuration file (i.e., the file that contains what is currently displayed on the NiFi graph). The default value is ./conf/flow.xml.gz.
|
|nifi.flow.configuration.file*|The location of the flow configuration file (i.e., the file that contains what is currently displayed on the NiFi graph). The default value is ./conf/flow.xml.gz.
|
||||||
|nifi.flow.configuration.archive.dir*|The location of the archive directory where backup copies of the flow.xml are saved. The default value is ./conf/archive.
|
|nifi.flow.configuration.archive.enabled*|Specify whether NiFi creates backup copy of the flow.xml automatically when NiFi updates the flow.xml. The default value is _true_.
|
||||||
|
|nifi.flow.configuration.archive.dir*|The location of the archive directory where backup copies of the flow.xml are saved. The default value is ./conf/archive. NiFi removes old archive files to limit disk usage based on file lifespan and total size as specified with max.time and max.storage below. However, this cleanup mechanism takes only automatically created archive flow.xml files into account. Meaning if there is other files or directories in this archive directory, NiFi ignores it. Automatically created archives have filename with ISO 8601 format timestamp prefix followed by '_<original-filename>'. ex) 20160706T160719+0900_flow.xml.gz . NiFi check filename pattern when it cleans archive directory. If you would like to keep a particular archive in this directory without worrying about being deleted by NiFi, you can do so by copying it with different filename pattern.
|
||||||
|
|nifi.flow.configuration.archive.max.time*|The lifespan of archived flow.xml files. NiFi will delete expired archive files when it updates flow.xml. Expiration is determined based on current system time and the last modified timestamp of an archived flow.xml. The default value is 30 days.
|
||||||
|
|nifi.flow.configuration.archive.max.storage*|The total data size allowed for the archived flow.xml files. NiFi will delete oldest archive files until the total archived file size becomes less than this configuration. The default value is 500 MB.
|
||||||
|nifi.flowcontroller.autoResumeState|Indicates whether -upon restart- the components on the NiFi graph should return to their last state. The default value is _true_.
|
|nifi.flowcontroller.autoResumeState|Indicates whether -upon restart- the components on the NiFi graph should return to their last state. The default value is _true_.
|
||||||
|nifi.flowcontroller.graceful.shutdown.period|Indicates the shutdown period. The default value is 10 sec.
|
|nifi.flowcontroller.graceful.shutdown.period|Indicates the shutdown period. The default value is 10 sec.
|
||||||
|nifi.flowservice.writedelay.interval|When many changes are made to the flow.xml, this property specifies how long to wait before writing out the changes, so as to batch the changes into a single write. The default value is 500 ms.
|
|nifi.flowservice.writedelay.interval|When many changes are made to the flow.xml, this property specifies how long to wait before writing out the changes, so as to batch the changes into a single write. The default value is 500 ms.
|
||||||
|
|
|
@ -117,10 +117,14 @@ Terminology
|
||||||
As a result, several components may be combined together to make a larger building block from which to create a dataflow.
|
As a result, several components may be combined together to make a larger building block from which to create a dataflow.
|
||||||
These templates can also be exported as XML and imported into another NiFi instance, allowing these building blocks to be shared.
|
These templates can also be exported as XML and imported into another NiFi instance, allowing these building blocks to be shared.
|
||||||
|
|
||||||
*flow.xml.gz*: Everything the DFM puts onto the NiFi User Interface canvas is written, in real time, to one file called the flow.xml.gz. This file is located in the nifi/conf directory.
|
*flow.xml.gz*: Everything the DFM puts onto the NiFi User Interface canvas is written, in real time, to one file called the flow.xml.gz. This file is located in the nifi/conf directory by default.
|
||||||
Any change made on the canvas is automatically saved to this file, without the user needing to click a "save" button. In addition, the user may create a back-up copy of this file at any time
|
Any change made on the canvas is automatically saved to this file, without the user needing to click a "save" button.
|
||||||
by selecting the Controller Settings button in the far-right section of the tool bar and clicking "Back-up flow" on the General tab. By default, this action saves a copy of the current flow in the nifi/conf/archive directory.
|
In addition, NiFi automatically create a back-up copy of this file in the archive directory when it is updated.
|
||||||
See <<Controller_Settings>> for a description of where the "Back-up flow" button may be found. (Note that in a NiFi Cluster, the NiFi Cluster Manager's copy of this file is named flow.tar, whereas this file is still named flow.xml.gz on the nodes.)
|
You can use these archived files to rollback flow configuration. To do so, stop NiFi, replace flow.xml.gz with a desired back-up copy, then restart NiFi.
|
||||||
|
With a clustered environment, stop whole NiFi cluster, replace flow.xml.gz of one of nodes, start the node first. Remove flow.xml.gz from other nodes.
|
||||||
|
Once you confirmed the node starts up as one node cluster, start other nodes. Then the replaced flow configration will be synchronized across cluster.
|
||||||
|
The name and location of flow.xml.gz, and auto archive behavior are configurable, see link:administration-guide.html#core-properties-br[Admin Guide] for further detail.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
[[User_Interface]]
|
[[User_Interface]]
|
||||||
|
@ -572,7 +576,7 @@ image:controller-settings-button.png["Controller Settings Button", width=200]
|
||||||
|
|
||||||
The Controller Settings window has three tabs across the top: General, Controller Services, and Reporting Tasks. The General tab is for settings that pertain to general information about the NiFi instance. For example, here, the DFM can provide a unique name for the overall dataflow, as well as comments that describe the flow. Be aware that this information is visible to any other NiFi instance that connects remotely to this instance (using Remote Process Groups, a.k.a., Site-to-Site).
|
The Controller Settings window has three tabs across the top: General, Controller Services, and Reporting Tasks. The General tab is for settings that pertain to general information about the NiFi instance. For example, here, the DFM can provide a unique name for the overall dataflow, as well as comments that describe the flow. Be aware that this information is visible to any other NiFi instance that connects remotely to this instance (using Remote Process Groups, a.k.a., Site-to-Site).
|
||||||
|
|
||||||
The General tab also provides settings for the overall maximum thread counts of the instance, as well as the ability to click "Back-up flow" to create a backup copy of the current flow, which is saved by default in the /conf/archive directory.
|
The General tab also provides settings for the overall maximum thread counts of the instance.
|
||||||
|
|
||||||
image:settings-general-tab.png["Controller Settings General Tab", width=700]
|
image:settings-general-tab.png["Controller Settings General Tab", width=700]
|
||||||
|
|
||||||
|
@ -1636,7 +1640,7 @@ Other Management Features
|
||||||
|
|
||||||
In addition to the Summary Page, Data Provenance Page, Template Management Page, and Bulletin Board Page, there are other tools in the Management Toolbar (See <<User_Interface>>) that are useful to the DFM. The Flow Configuration History, which is available by clicking on the clock icon ( image:iconFlowHistory.png["Flow History", width=28] ) in the Management Toolbar, shows all the changes that have been made to the dataflow. The history can aid in troubleshooting, such as if a recent change to the dataflow has caused a problem and needs to be fixed. The DFM can see what changes have been made and adjust the flow as needed to fix the problem. While NiFi does not have an "undo" feature, the DFM can make new changes to the dataflow that will fix the problem.
|
In addition to the Summary Page, Data Provenance Page, Template Management Page, and Bulletin Board Page, there are other tools in the Management Toolbar (See <<User_Interface>>) that are useful to the DFM. The Flow Configuration History, which is available by clicking on the clock icon ( image:iconFlowHistory.png["Flow History", width=28] ) in the Management Toolbar, shows all the changes that have been made to the dataflow. The history can aid in troubleshooting, such as if a recent change to the dataflow has caused a problem and needs to be fixed. The DFM can see what changes have been made and adjust the flow as needed to fix the problem. While NiFi does not have an "undo" feature, the DFM can make new changes to the dataflow that will fix the problem.
|
||||||
|
|
||||||
Two other tools in the Management Toolbar are the Controller Settings page ( image:iconSettings.png["Settings", width=28] ) and the Users page ( image:iconUsers.png["Users", width=28] ). The Controller Settings page provides the ability to change the name of the NiFi instance, add comments describing the NiFi instance, set the maximum number of threads that are available to the application, and create a back-up copy of the dataflow(s) currently on the canvas. It also provides tabs where DFMs may add and configure Controller Services and Reporting Tasks (see <<Controller_Services_and_Reporting_Tasks>>). The Users page is used to manage user access, which is described in the link:administration-guide.html[Admin Guide].
|
Two other tools in the Management Toolbar are the Controller Settings page ( image:iconSettings.png["Settings", width=28] ) and the Users page ( image:iconUsers.png["Users", width=28] ). The Controller Settings page provides the ability to change the name of the NiFi instance, add comments describing the NiFi instance, set the maximum number of threads that are available to the application. It also provides tabs where DFMs may add and configure Controller Services and Reporting Tasks (see <<Controller_Services_and_Reporting_Tasks>>). The Users page is used to manage user access, which is described in the link:administration-guide.html[Admin Guide].
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -102,13 +102,6 @@ public interface FlowService extends LifeCycle {
|
||||||
*/
|
*/
|
||||||
void copyCurrentFlow(OutputStream os) throws IOException;
|
void copyCurrentFlow(OutputStream os) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a copy of the current flow and saves it in the configured 'archive' directory
|
|
||||||
*
|
|
||||||
* @throws IOException if unable to write to the archive directory
|
|
||||||
*/
|
|
||||||
void archiveFlow() throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a DataFlow object from the current flow
|
* Creates a DataFlow object from the current flow
|
||||||
*
|
*
|
||||||
|
|
|
@ -67,7 +67,6 @@ import java.io.BufferedInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -223,19 +222,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void archiveFlow() throws IOException {
|
|
||||||
writeLock.lock();
|
|
||||||
try {
|
|
||||||
final File archiveFile = dao.createArchiveFile();
|
|
||||||
try (final OutputStream out = new FileOutputStream(archiveFile)) {
|
|
||||||
dao.load(out, true);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
writeLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveFlowChanges() throws IOException {
|
public void saveFlowChanges() throws IOException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -269,7 +255,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void saveFlowChanges(final TimeUnit delayUnit, final long delay) {
|
public void saveFlowChanges(final TimeUnit delayUnit, final long delay) {
|
||||||
saveFlowChanges(delayUnit, delay, false);
|
final boolean archiveEnabled = NiFiProperties.getInstance().isFlowConfigurationArchiveEnabled();
|
||||||
|
saveFlowChanges(delayUnit, delay, archiveEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -944,7 +931,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
final Calendar now = Calendar.getInstance();
|
final Calendar now = Calendar.getInstance();
|
||||||
if (holder.saveTime.before(now) || holder.shouldArchive) {
|
if (holder.saveTime.before(now)) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Waiting for write lock and then will save");
|
logger.trace("Waiting for write lock and then will save");
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.persistence;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
|
import java.util.Calendar;
|
||||||
|
import java.util.GregorianCalendar;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TimeZone;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class FlowConfigurationArchiveManager {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FlowConfigurationArchiveManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents archive file name such as followings:
|
||||||
|
* <li>yyyyMMddTHHmmss+HHmm_original-file-name</li>
|
||||||
|
* <li>yyyyMMddTHHmmss-HHmm_original-file-name</li>
|
||||||
|
* <li>yyyyMMddTHHmmssZ_original-file-name</li>
|
||||||
|
*/
|
||||||
|
private final Pattern archiveFilenamePattern = Pattern.compile("^([\\d]{8}T[\\d]{6}([\\+\\-][\\d]{4}|Z))_.+$");
|
||||||
|
private final Path flowFile;
|
||||||
|
private final Path archiveDir;
|
||||||
|
private final long maxTimeMillis;
|
||||||
|
private final long maxStorageBytes;
|
||||||
|
|
||||||
|
public FlowConfigurationArchiveManager(final Path flowFile, NiFiProperties properties) {
|
||||||
|
final String archiveDirVal = properties.getFlowConfigurationArchiveDir();
|
||||||
|
final Path archiveDir = (archiveDirVal == null || archiveDirVal.equals(""))
|
||||||
|
? flowFile.getParent().resolve("archive") : new File(archiveDirVal).toPath();
|
||||||
|
|
||||||
|
final long archiveMaxTime =
|
||||||
|
FormatUtils.getTimeDuration(properties.getFlowConfigurationArchiveMaxTime(), TimeUnit.MILLISECONDS);
|
||||||
|
final long archiveMaxStorage =
|
||||||
|
DataUnit.parseDataSize(properties.getFlowConfigurationArchiveMaxStorage(), DataUnit.B).longValue();
|
||||||
|
|
||||||
|
this.flowFile = flowFile;
|
||||||
|
this.archiveDir = archiveDir;
|
||||||
|
this.maxTimeMillis = archiveMaxTime;
|
||||||
|
this.maxStorageBytes = archiveMaxStorage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FlowConfigurationArchiveManager(final Path flowFile, final Path archiveDir, long maxTimeMillis, long maxStorageBytes) {
|
||||||
|
this.flowFile = flowFile;
|
||||||
|
this.archiveDir = archiveDir;
|
||||||
|
this.maxTimeMillis = maxTimeMillis;
|
||||||
|
this.maxStorageBytes = maxStorageBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createArchiveFileName(final String originalFlowFileName) {
|
||||||
|
TimeZone tz = TimeZone.getDefault();
|
||||||
|
Calendar cal = GregorianCalendar.getInstance(tz);
|
||||||
|
int offsetInMillis = tz.getOffset(cal.getTimeInMillis());
|
||||||
|
final int year = cal.get(Calendar.YEAR);
|
||||||
|
final int month = cal.get(Calendar.MONTH) + 1;
|
||||||
|
final int day = cal.get(Calendar.DAY_OF_MONTH);
|
||||||
|
final int hour = cal.get(Calendar.HOUR_OF_DAY);
|
||||||
|
final int min = cal.get(Calendar.MINUTE);
|
||||||
|
final int sec = cal.get(Calendar.SECOND);
|
||||||
|
|
||||||
|
String offset = String.format("%s%02d%02d",
|
||||||
|
(offsetInMillis >= 0 ? "+" : "-"),
|
||||||
|
Math.abs(offsetInMillis / 3600000),
|
||||||
|
Math.abs((offsetInMillis / 60000) % 60));
|
||||||
|
|
||||||
|
return String.format("%d%02d%02dT%02d%02d%02d%s_%s",
|
||||||
|
year, month, day, hour, min, sec, offset, originalFlowFileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup a file to archive data flow. Create archive directory if it doesn't exist yet.
|
||||||
|
* @return Resolved archive file which is ready to write to
|
||||||
|
* @throws IOException when it fails to access archive dir
|
||||||
|
*/
|
||||||
|
public File setupArchiveFile() throws IOException {
|
||||||
|
Files.createDirectories(archiveDir);
|
||||||
|
|
||||||
|
if (!Files.isDirectory(archiveDir)) {
|
||||||
|
throw new IOException("Archive directory doesn't appear to be a directory " + archiveDir);
|
||||||
|
}
|
||||||
|
final String originalFlowFileName = flowFile.getFileName().toString();
|
||||||
|
final Path archiveFile = archiveDir.resolve(createArchiveFileName(originalFlowFileName));
|
||||||
|
return archiveFile.toFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Archive current flow configuration file by copying the original file to the archive directory.
|
||||||
|
* After creating new archive file:
|
||||||
|
* <li>It removes expired archive files based on its last modification date and maxTimeMillis</li>
|
||||||
|
* <li>It removes old files until total size of archive files becomes less than maxStorageBytes</li>
|
||||||
|
* This method keeps other files intact, so that users can keep particular archive by copying it with different name.
|
||||||
|
* Whether a given file is archive file or not is determined by the filename.
|
||||||
|
* Since archive file name consists of timestamp up to seconds, if archive is called multiple times within a second,
|
||||||
|
* it will overwrite existing archive file with the same name.
|
||||||
|
* @return Newly created archive file, archive filename is computed by adding ISO8601
|
||||||
|
* timestamp prefix to the original filename, ex) 20160706T160719+0900_flow.xml.gz
|
||||||
|
* @throws IOException If it fails to create new archive file.
|
||||||
|
* Although, other IOExceptions like the ones thrown during removing expired archive files will not be thrown.
|
||||||
|
*/
|
||||||
|
public File archive() throws IOException {
|
||||||
|
final String originalFlowFileName = flowFile.getFileName().toString();
|
||||||
|
final File archiveFile = setupArchiveFile();
|
||||||
|
Files.copy(flowFile, archiveFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
|
||||||
|
// Collect archive files by its name, and group by expiry state.
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
final Map<Boolean, List<Path>> oldArchives = Files.walk(archiveDir, 1).filter(p -> {
|
||||||
|
final String filename = p.getFileName().toString();
|
||||||
|
if (Files.isRegularFile(p) && filename.endsWith("_" + originalFlowFileName)) {
|
||||||
|
final Matcher matcher = archiveFilenamePattern.matcher(filename);
|
||||||
|
if (matcher.matches() && filename.equals(matcher.group(1) + "_" + originalFlowFileName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}).collect(Collectors.groupingBy(p -> (now - p.toFile().lastModified()) > maxTimeMillis, Collectors.toList()));
|
||||||
|
|
||||||
|
logger.debug("oldArchives={}", oldArchives);
|
||||||
|
|
||||||
|
// Remove expired files
|
||||||
|
final List<Path> expiredArchives = oldArchives.get(true);
|
||||||
|
if (expiredArchives != null) {
|
||||||
|
expiredArchives.stream().forEach(p -> {
|
||||||
|
try {
|
||||||
|
logger.info("Removing expired archive file {}", p);
|
||||||
|
Files.delete(p);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Failed to delete expired archive {} due to {}", p, e.toString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate size
|
||||||
|
final List<Path> remainingArchives = oldArchives.get(false);
|
||||||
|
final long totalArchiveSize = remainingArchives.stream().mapToLong(p -> {
|
||||||
|
try {
|
||||||
|
return Files.size(p);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Failed to get file size of {} due to {}", p, e.toString());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}).sum();
|
||||||
|
logger.debug("totalArchiveSize={}", totalArchiveSize);
|
||||||
|
|
||||||
|
// Remove old files until total size gets less than max storage size
|
||||||
|
remainingArchives.sort((a, b)
|
||||||
|
-> Long.valueOf(a.toFile().lastModified()).compareTo(Long.valueOf(b.toFile().lastModified())));
|
||||||
|
long reducedTotalArchiveSize = totalArchiveSize;
|
||||||
|
for (int i = 0; i < remainingArchives.size()
|
||||||
|
&& reducedTotalArchiveSize > maxStorageBytes; i++) {
|
||||||
|
final Path path = remainingArchives.get(i);
|
||||||
|
try {
|
||||||
|
logger.info("Removing archive file {} to reduce storage usage. currentSize={}", path, reducedTotalArchiveSize);
|
||||||
|
final long size = Files.size(path);
|
||||||
|
Files.delete(path);
|
||||||
|
reducedTotalArchiveSize -= size;
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Failed to delete {} to reduce storage usage, due to {}", path, e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return archiveFile;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.persistence;
|
package org.apache.nifi.persistence;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -111,11 +110,4 @@ public interface FlowConfigurationDAO {
|
||||||
*/
|
*/
|
||||||
void save(FlowController flow, boolean archive) throws IOException;
|
void save(FlowController flow, boolean archive) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a File that can be used to write an archive to. The file will not actually exist on disk.
|
|
||||||
*
|
|
||||||
* @return a File that can be used to write an archive to
|
|
||||||
* @throws IOException if unable to access the required directories
|
|
||||||
*/
|
|
||||||
File createArchiveFile() throws IOException;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,10 +43,9 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationDAO {
|
public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationDAO {
|
||||||
|
|
||||||
public static final String CONFIGURATION_ARCHIVE_DIR_KEY = "nifi.flow.configuration.archive.dir";
|
|
||||||
|
|
||||||
private final Path flowXmlPath;
|
private final Path flowXmlPath;
|
||||||
private final StringEncryptor encryptor;
|
private final StringEncryptor encryptor;
|
||||||
|
private final FlowConfigurationArchiveManager archiveManager;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class);
|
||||||
|
|
||||||
|
@ -65,8 +64,9 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
|
||||||
|
|
||||||
this.flowXmlPath = flowXml;
|
this.flowXmlPath = flowXml;
|
||||||
this.encryptor = encryptor;
|
this.encryptor = encryptor;
|
||||||
}
|
|
||||||
|
|
||||||
|
this.archiveManager = new FlowConfigurationArchiveManager(flowXmlPath, NiFiProperties.getInstance());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isFlowPresent() {
|
public boolean isFlowPresent() {
|
||||||
|
@ -160,8 +160,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
|
||||||
|
|
||||||
if (archive) {
|
if (archive) {
|
||||||
try {
|
try {
|
||||||
final File archiveFile = createArchiveFile();
|
archiveManager.archive();
|
||||||
Files.copy(configFile, archiveFile.toPath());
|
|
||||||
} catch (final Exception ex) {
|
} catch (final Exception ex) {
|
||||||
LOG.warn("Unable to archive flow configuration as requested due to " + ex);
|
LOG.warn("Unable to archive flow configuration as requested due to " + ex);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -171,16 +170,4 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public File createArchiveFile() throws IOException {
|
|
||||||
final String archiveDirVal = NiFiProperties.getInstance().getProperty(CONFIGURATION_ARCHIVE_DIR_KEY);
|
|
||||||
final Path archiveDir = (archiveDirVal == null || archiveDirVal.equals("")) ? flowXmlPath.getParent().resolve("archive") : new File(archiveDirVal).toPath();
|
|
||||||
Files.createDirectories(archiveDir);
|
|
||||||
|
|
||||||
if (!Files.isDirectory(archiveDir)) {
|
|
||||||
throw new IOException("Archive directory doesn't appear to be a directory " + archiveDir);
|
|
||||||
}
|
|
||||||
final Path archiveFile = archiveDir.resolve(System.nanoTime() + "-" + flowXmlPath.toFile().getName());
|
|
||||||
return archiveFile.toFile();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,164 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.persistence;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.file.FileVisitResult;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.NoSuchFileException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.SimpleFileVisitor;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
|
import java.nio.file.attribute.FileTime;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestFlowConfigurationArchiveManager {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(TestFlowConfigurationArchiveManager.class);
|
||||||
|
private final File flowFile = new File("./target/flow-archive/flow.xml.gz");
|
||||||
|
private final File archiveDir = new File("./target/flow-archive");
|
||||||
|
private final long maxTime = FormatUtils.getTimeDuration("30 days", TimeUnit.MILLISECONDS);
|
||||||
|
private long maxStorage = DataUnit.parseDataSize("500 MB", DataUnit.B).longValue();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
|
||||||
|
// Clean up old files.
|
||||||
|
if (Files.isDirectory(archiveDir.toPath())) {
|
||||||
|
Files.walkFileTree(archiveDir.toPath(), new SimpleFileVisitor<Path>(){
|
||||||
|
@Override
|
||||||
|
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||||
|
Files.delete(file);
|
||||||
|
return FileVisitResult.CONTINUE;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create original flow.xml.gz
|
||||||
|
Files.createDirectories(flowFile.getParentFile().toPath());
|
||||||
|
try (OutputStream os = Files.newOutputStream(flowFile.toPath(),
|
||||||
|
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
|
||||||
|
// 10 bytes.
|
||||||
|
os.write("0123456789".getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = NoSuchFileException.class)
|
||||||
|
public void testArchiveWithoutOriginalFile() throws Exception {
|
||||||
|
final File flowFile = new File("does-not-exist");
|
||||||
|
final FlowConfigurationArchiveManager archiveManager =
|
||||||
|
new FlowConfigurationArchiveManager(flowFile.toPath(), archiveDir.toPath(), maxTime, maxStorage);
|
||||||
|
|
||||||
|
archiveManager.archive();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createSimulatedOldArchives(final File[] oldArchives, final long intervalMillis) throws Exception {
|
||||||
|
|
||||||
|
// Create old archive files. Altering file name and last modified date to simulate existing files.
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
final SimpleDateFormat dateFormat = new SimpleDateFormat("HHmmss");
|
||||||
|
|
||||||
|
FlowConfigurationArchiveManager archiveManager =
|
||||||
|
new FlowConfigurationArchiveManager(flowFile.toPath(), archiveDir.toPath(), maxTime, maxStorage);
|
||||||
|
|
||||||
|
for (int i = oldArchives.length; i > 0; i--) {
|
||||||
|
final Date date = new Date(now - (intervalMillis * i));
|
||||||
|
final String hhmmss = dateFormat.format(date);
|
||||||
|
|
||||||
|
final File archiveFile = archiveManager.archive();
|
||||||
|
final String renamedArchiveName = archiveFile.getName().replaceFirst("T[\\d]{6}", "T" + hhmmss);
|
||||||
|
final File renamedArchive = archiveFile.getParentFile().toPath().resolve(renamedArchiveName).toFile();
|
||||||
|
archiveFile.renameTo(renamedArchive);
|
||||||
|
|
||||||
|
Files.setLastModifiedTime(renamedArchive.toPath(), FileTime.fromMillis(date.getTime()));
|
||||||
|
|
||||||
|
oldArchives[oldArchives.length - i] = renamedArchive;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveExpiration() throws Exception {
|
||||||
|
|
||||||
|
final long intervalMillis = 60_000;
|
||||||
|
File[] oldArchives = new File[5];
|
||||||
|
createSimulatedOldArchives(oldArchives, intervalMillis);
|
||||||
|
|
||||||
|
// Now, we will test expiration. There should be following old archives created above:
|
||||||
|
// -5 min, -4 min, -3min, -2min, -1min
|
||||||
|
// if maxTime = 3.5min, The oldest two files should be removed, -5 min and -4 min,
|
||||||
|
// resulting four files of -3min, -2min, -1min, and newly created archive.
|
||||||
|
final long maxTimeForExpirationTest = intervalMillis * 3 + (intervalMillis / 2);
|
||||||
|
FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(),
|
||||||
|
archiveDir.toPath(), maxTimeForExpirationTest, maxStorage);
|
||||||
|
|
||||||
|
final File archive = archiveManager.archive();
|
||||||
|
assertTrue(archive.isFile());
|
||||||
|
|
||||||
|
assertFalse(oldArchives[0].exists());
|
||||||
|
assertFalse(oldArchives[1].exists());
|
||||||
|
assertTrue(oldArchives[2].isFile());
|
||||||
|
assertTrue(oldArchives[3].isFile());
|
||||||
|
assertTrue(oldArchives[4].isFile());
|
||||||
|
|
||||||
|
assertTrue("Original file should remain intact", flowFile.isFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveStorageSizeLimit() throws Exception {
|
||||||
|
|
||||||
|
final long intervalMillis = 60_000;
|
||||||
|
File[] oldArchives = new File[5];
|
||||||
|
createSimulatedOldArchives(oldArchives, intervalMillis);
|
||||||
|
|
||||||
|
// Now, we will test storage size limit. There should be following old archives created above:
|
||||||
|
// -5 min, -4 min, -3min, -2min, -1min, each of those have 10 bytes.
|
||||||
|
// if maxStorage = 20 bytes, The oldest four files should be removed,
|
||||||
|
// resulting two files of -1min, and newly created archive, 20 bytes in total.
|
||||||
|
FlowConfigurationArchiveManager archiveManager = new FlowConfigurationArchiveManager(flowFile.toPath(),
|
||||||
|
archiveDir.toPath(), maxTime, 20);
|
||||||
|
|
||||||
|
final File archive = archiveManager.archive();
|
||||||
|
assertTrue(archive.isFile());
|
||||||
|
|
||||||
|
assertFalse(oldArchives[0].exists());
|
||||||
|
assertFalse(oldArchives[1].exists());
|
||||||
|
assertFalse(oldArchives[2].exists());
|
||||||
|
assertFalse(oldArchives[3].exists());
|
||||||
|
assertTrue(oldArchives[4].isFile());
|
||||||
|
|
||||||
|
assertTrue("Original file should remain intact", flowFile.isFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -16,7 +16,10 @@
|
||||||
# Core Properties #
|
# Core Properties #
|
||||||
nifi.version=${nifi.version}
|
nifi.version=${nifi.version}
|
||||||
nifi.flow.configuration.file=${nifi.flow.configuration.file}
|
nifi.flow.configuration.file=${nifi.flow.configuration.file}
|
||||||
|
nifi.flow.configuration.archive.enabled=${nifi.flow.configuration.archive.enabled}
|
||||||
nifi.flow.configuration.archive.dir=${nifi.flow.configuration.archive.dir}
|
nifi.flow.configuration.archive.dir=${nifi.flow.configuration.archive.dir}
|
||||||
|
nifi.flow.configuration.archive.max.time=${nifi.flow.configuration.archive.max.time}
|
||||||
|
nifi.flow.configuration.archive.max.storage=${nifi.flow.configuration.archive.max.storage}
|
||||||
nifi.flowcontroller.autoResumeState=${nifi.flowcontroller.autoResumeState}
|
nifi.flowcontroller.autoResumeState=${nifi.flowcontroller.autoResumeState}
|
||||||
nifi.flowcontroller.graceful.shutdown.period=${nifi.flowcontroller.graceful.shutdown.period}
|
nifi.flowcontroller.graceful.shutdown.period=${nifi.flowcontroller.graceful.shutdown.period}
|
||||||
nifi.flowservice.writedelay.interval=${nifi.flowservice.writedelay.interval}
|
nifi.flowservice.writedelay.interval=${nifi.flowservice.writedelay.interval}
|
||||||
|
|
|
@ -283,14 +283,6 @@ public interface NiFiServiceFacade {
|
||||||
*/
|
*/
|
||||||
ControllerConfigurationEntity updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO);
|
ControllerConfigurationEntity updateControllerConfiguration(Revision revision, ControllerConfigurationDTO controllerConfigurationDTO);
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new archive of the flow configuration.
|
|
||||||
*
|
|
||||||
* @return snapshot
|
|
||||||
*/
|
|
||||||
ProcessGroupEntity createArchive();
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the process group status.
|
* Returns the process group status.
|
||||||
*
|
*
|
||||||
|
|
|
@ -196,7 +196,6 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.ws.rs.WebApplicationException;
|
import javax.ws.rs.WebApplicationException;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -1488,17 +1487,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
||||||
return flowEntity;
|
return flowEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ProcessGroupEntity createArchive() {
|
|
||||||
try {
|
|
||||||
controllerFacade.createArchive();
|
|
||||||
} catch (final IOException e) {
|
|
||||||
logger.error("Failed to create an archive", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return getProcessGroup("root");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
|
public ControllerServiceEntity createControllerService(final Revision revision, final String groupId, final ControllerServiceDTO controllerServiceDTO) {
|
||||||
controllerServiceDTO.setParentGroupId(groupId);
|
controllerServiceDTO.setParentGroupId(groupId);
|
||||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||||
import org.apache.nifi.web.api.entity.HistoryEntity;
|
import org.apache.nifi.web.api.entity.HistoryEntity;
|
||||||
import org.apache.nifi.web.api.entity.NodeEntity;
|
import org.apache.nifi.web.api.entity.NodeEntity;
|
||||||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
|
||||||
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
||||||
import org.apache.nifi.web.api.request.DateTimeParameter;
|
import org.apache.nifi.web.api.request.DateTimeParameter;
|
||||||
|
|
||||||
|
@ -116,65 +115,6 @@ public class ControllerResource extends ApplicationResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new archive of this flow controller. Note, this is a POST operation that returns a URI that is not representative of the thing that was actually created. The archive that is created
|
|
||||||
* cannot be referenced at a later time, therefore there is no corresponding URI. Instead the request URI is returned.
|
|
||||||
*
|
|
||||||
* Alternatively, we could have performed a PUT request. However, PUT requests are supposed to be idempotent and this endpoint is certainly not.
|
|
||||||
*
|
|
||||||
* @param httpServletRequest request
|
|
||||||
* @return A processGroupEntity.
|
|
||||||
*/
|
|
||||||
@POST
|
|
||||||
@Consumes(MediaType.APPLICATION_JSON)
|
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
|
||||||
@Path("archive")
|
|
||||||
// TODO - @PreAuthorize("hasRole('ROLE_DFM')")
|
|
||||||
@ApiOperation(
|
|
||||||
value = "Creates a new archive of this NiFi flow configuration",
|
|
||||||
notes = "This POST operation returns a URI that is not representative of the thing "
|
|
||||||
+ "that was actually created. The archive that is created cannot be referenced "
|
|
||||||
+ "at a later time, therefore there is no corresponding URI. Instead the "
|
|
||||||
+ "request URI is returned.",
|
|
||||||
response = ProcessGroupEntity.class,
|
|
||||||
authorizations = {
|
|
||||||
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
@ApiResponses(
|
|
||||||
value = {
|
|
||||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
|
||||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
|
||||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
|
||||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
public Response createArchive(@Context final HttpServletRequest httpServletRequest) {
|
|
||||||
|
|
||||||
if (isReplicateRequest()) {
|
|
||||||
return replicate(HttpMethod.POST);
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle expects request (usually from the cluster manager)
|
|
||||||
final boolean validationPhase = isValidationPhase(httpServletRequest);
|
|
||||||
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
|
|
||||||
// authorize access
|
|
||||||
serviceFacade.authorizeAccess(lookup -> {
|
|
||||||
authorizeController(RequestAction.WRITE);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (validationPhase) {
|
|
||||||
return generateContinueResponse().build();
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the archive
|
|
||||||
final ProcessGroupEntity entity = serviceFacade.createArchive();
|
|
||||||
|
|
||||||
// generate the response
|
|
||||||
final URI uri = URI.create(generateResourceUri("controller", "archive"));
|
|
||||||
return clusterContext(generateCreatedResponse(uri, entity)).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the configuration for this NiFi.
|
* Retrieves the configuration for this NiFi.
|
||||||
*
|
*
|
||||||
|
|
|
@ -151,15 +151,6 @@ public class ControllerFacade implements Authorizable {
|
||||||
private DtoFactory dtoFactory;
|
private DtoFactory dtoFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates an archive of the current flow.
|
|
||||||
*
|
|
||||||
* @throws IOException if unable to save a copy of the flow
|
|
||||||
*/
|
|
||||||
public void createArchive() throws IOException {
|
|
||||||
flowService.archiveFlow();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the group id that contains the specified processor.
|
* Returns the group id that contains the specified processor.
|
||||||
*
|
*
|
||||||
|
|
|
@ -33,8 +33,6 @@
|
||||||
</div>
|
</div>
|
||||||
<div class="editable setting-field">
|
<div class="editable setting-field">
|
||||||
<input type="text" id="maximum-timer-driven-thread-count-field" class="setting-input"/>
|
<input type="text" id="maximum-timer-driven-thread-count-field" class="setting-input"/>
|
||||||
<span id="archive-flow-link" class="link">Back-up flow</span>
|
|
||||||
<div class="fa fa-question-circle" alt="Info" title="Archives the flow configuration."></div>
|
|
||||||
</div>
|
</div>
|
||||||
<div class="read-only setting-field">
|
<div class="read-only setting-field">
|
||||||
<span id="read-only-maximum-timer-driven-thread-count-field"></span>
|
<span id="read-only-maximum-timer-driven-thread-count-field"></span>
|
||||||
|
|
|
@ -114,7 +114,3 @@ div.settings-buttons {
|
||||||
div.settings-buttons div.button {
|
div.settings-buttons div.button {
|
||||||
float: left;
|
float: left;
|
||||||
}
|
}
|
||||||
|
|
||||||
#archive-flow-link {
|
|
||||||
margin-left: 10px;
|
|
||||||
}
|
|
|
@ -27,7 +27,6 @@ nf.Settings = (function () {
|
||||||
urls: {
|
urls: {
|
||||||
api: '../nifi-api',
|
api: '../nifi-api',
|
||||||
controllerConfig: '../nifi-api/controller/config',
|
controllerConfig: '../nifi-api/controller/config',
|
||||||
controllerArchive: '../nifi-api/controller/archive',
|
|
||||||
reportingTaskTypes: '../nifi-api/flow/reporting-task-types',
|
reportingTaskTypes: '../nifi-api/flow/reporting-task-types',
|
||||||
createReportingTask: '../nifi-api/controller/reporting-tasks',
|
createReportingTask: '../nifi-api/controller/reporting-tasks',
|
||||||
reportingTasks: '../nifi-api/flow/reporting-tasks'
|
reportingTasks: '../nifi-api/flow/reporting-tasks'
|
||||||
|
|
Loading…
Reference in New Issue