mirror of https://github.com/apache/nifi.git
NIFI-7521: Removed unused properties from nifi properties. Updated the defaults to use for File System's archive cleanup, flowfile checkpoint interval
This commit is contained in:
parent
99b3780ad9
commit
e371f4ac7c
|
@ -106,7 +106,6 @@ public abstract class NiFiProperties {
|
|||
public static final String FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION = "nifi.flowfile.repository.wal.implementation";
|
||||
public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync";
|
||||
public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory";
|
||||
public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions";
|
||||
public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY = "nifi.flowfile.repository.encryption.key";
|
||||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_ID = "nifi.flowfile.repository.encryption.key.id";
|
||||
|
@ -114,10 +113,6 @@ public abstract class NiFiProperties {
|
|||
public static final String FLOWFILE_REPOSITORY_ENCRYPTION_KEY_PROVIDER_LOCATION = "nifi.flowfile.repository.encryption.key.provider.location";
|
||||
public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
|
||||
public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
|
||||
public static final String SWAP_IN_THREADS = "nifi.swap.in.threads";
|
||||
public static final String SWAP_IN_PERIOD = "nifi.swap.in.period";
|
||||
public static final String SWAP_OUT_THREADS = "nifi.swap.out.threads";
|
||||
public static final String SWAP_OUT_PERIOD = "nifi.swap.out.period";
|
||||
|
||||
// provenance properties
|
||||
public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation";
|
||||
|
@ -181,7 +176,6 @@ public abstract class NiFiProperties {
|
|||
public static final String SECURITY_USER_KNOX_AUDIENCES = "nifi.security.user.knox.audiences";
|
||||
|
||||
// web properties
|
||||
public static final String WEB_WAR_DIR = "nifi.web.war.directory";
|
||||
public static final String WEB_HTTP_PORT = "nifi.web.http.port";
|
||||
public static final String WEB_HTTP_PORT_FORWARDING = "nifi.web.http.port.forwarding";
|
||||
public static final String WEB_HTTP_HOST = "nifi.web.http.host";
|
||||
|
@ -278,16 +272,10 @@ public abstract class NiFiProperties {
|
|||
public static final String DEFAULT_COMPONENT_DOCS_DIRECTORY = "./work/docs/components";
|
||||
public static final String DEFAULT_NAR_LIBRARY_DIR = "./lib";
|
||||
public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions";
|
||||
public static final String DEFAULT_FLOWFILE_REPO_PARTITIONS = "256";
|
||||
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "2 min";
|
||||
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs";
|
||||
public static final int DEFAULT_MAX_FLOWFILES_PER_CLAIM = 100;
|
||||
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "1 MB";
|
||||
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
|
||||
public static final String DEFAULT_SWAP_STORAGE_LOCATION = "./flowfile_repository/swap";
|
||||
public static final String DEFAULT_SWAP_IN_PERIOD = "1 sec";
|
||||
public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec";
|
||||
public static final int DEFAULT_SWAP_IN_THREADS = 4;
|
||||
public static final int DEFAULT_SWAP_OUT_THREADS = 4;
|
||||
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
|
||||
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
|
||||
public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
|
||||
|
@ -310,9 +298,6 @@ public abstract class NiFiProperties {
|
|||
// cluster common defaults
|
||||
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
|
||||
public static final int DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_MISSABLE_MAX = 8;
|
||||
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_BROADCAST_DELAY = "500 ms";
|
||||
public static final int DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS = 3;
|
||||
public static final String DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS_DELAY = "1 sec";
|
||||
public static final String DEFAULT_CLUSTER_NODE_READ_TIMEOUT = "5 sec";
|
||||
public static final String DEFAULT_CLUSTER_NODE_CONNECTION_TIMEOUT = "5 sec";
|
||||
public static final int DEFAULT_CLUSTER_NODE_MAX_CONCURRENT_REQUESTS = 100;
|
||||
|
@ -320,7 +305,6 @@ public abstract class NiFiProperties {
|
|||
// cluster node defaults
|
||||
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 10;
|
||||
public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_MAX_THREADS = 50;
|
||||
public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 secs";
|
||||
public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
|
||||
|
||||
// cluster load balance defaults
|
||||
|
@ -418,30 +402,6 @@ public abstract class NiFiProperties {
|
|||
}
|
||||
}
|
||||
|
||||
public int getSwapInThreads() {
|
||||
return getIntegerProperty(SWAP_IN_THREADS, DEFAULT_SWAP_IN_THREADS);
|
||||
}
|
||||
|
||||
public int getSwapOutThreads() {
|
||||
final String value = getProperty(SWAP_OUT_THREADS);
|
||||
if (value == null) {
|
||||
return DEFAULT_SWAP_OUT_THREADS;
|
||||
}
|
||||
|
||||
try {
|
||||
return Integer.parseInt(getProperty(SWAP_OUT_THREADS));
|
||||
} catch (final Exception e) {
|
||||
return DEFAULT_SWAP_OUT_THREADS;
|
||||
}
|
||||
}
|
||||
|
||||
public String getSwapInPeriod() {
|
||||
return getProperty(SWAP_IN_PERIOD, DEFAULT_SWAP_IN_PERIOD);
|
||||
}
|
||||
|
||||
public String getSwapOutPeriod() {
|
||||
return getProperty(SWAP_OUT_PERIOD, DEFAULT_SWAP_OUT_PERIOD);
|
||||
}
|
||||
|
||||
public String getAdministrativeYieldDuration() {
|
||||
return getProperty(ADMINISTRATIVE_YIELD_DURATION, DEFAULT_ADMINISTRATIVE_YIELD_DURATION);
|
||||
|
@ -552,17 +512,6 @@ public abstract class NiFiProperties {
|
|||
return Boolean.parseBoolean(rawAutoResumeState);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of partitions that should be used for the FlowFile
|
||||
* Repository
|
||||
*
|
||||
* @return the number of partitions
|
||||
*/
|
||||
public int getFlowFileRepositoryPartitions() {
|
||||
final String rawProperty = getProperty(FLOWFILE_REPOSITORY_PARTITIONS,
|
||||
DEFAULT_FLOWFILE_REPO_PARTITIONS);
|
||||
return Integer.parseInt(rawProperty);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of milliseconds between FlowFileRepository
|
||||
|
@ -571,8 +520,7 @@ public abstract class NiFiProperties {
|
|||
* @return the number of milliseconds between checkpoint events
|
||||
*/
|
||||
public String getFlowFileRepositoryCheckpointInterval() {
|
||||
return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL,
|
||||
DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL);
|
||||
return getProperty(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL, DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -85,7 +85,8 @@ import java.util.regex.Pattern;
|
|||
public class FileSystemRepository implements ContentRepository {
|
||||
|
||||
public static final int SECTIONS_PER_CONTAINER = 1024;
|
||||
public static final long MIN_CLEANUP_INTERVAL_MILLIS = 1000;
|
||||
public static final long MIN_CLEANUP_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(1L);
|
||||
public static final long DEFAULT_CLEANUP_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1L);
|
||||
public static final String ARCHIVE_DIR_NAME = "archive";
|
||||
// 100 MB cap for the configurable NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE property to prevent
|
||||
// unnecessarily large resource claim files
|
||||
|
@ -1707,7 +1708,7 @@ public class FileSystemRepository implements ContentRepository {
|
|||
* warning will be logged and the method will return minimum value of 1000
|
||||
*/
|
||||
private long determineCleanupInterval(NiFiProperties properties) {
|
||||
long cleanupInterval = MIN_CLEANUP_INTERVAL_MILLIS;
|
||||
long cleanupInterval = DEFAULT_CLEANUP_INTERVAL_MILLIS;
|
||||
String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
|
||||
if (archiveCleanupFrequency != null) {
|
||||
try {
|
||||
|
|
|
@ -102,7 +102,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
final long checkpointDelayMillis;
|
||||
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
|
||||
final List<File> recoveryFiles = new ArrayList<>();
|
||||
private final int numPartitions;
|
||||
final ScheduledExecutorService checkpointExecutor;
|
||||
|
||||
private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
|
||||
|
@ -143,7 +142,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
public WriteAheadFlowFileRepository() {
|
||||
alwaysSync = false;
|
||||
checkpointDelayMillis = 0L;
|
||||
numPartitions = 0;
|
||||
checkpointExecutor = null;
|
||||
walImplementation = null;
|
||||
nifiProperties = null;
|
||||
|
@ -179,7 +177,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
}
|
||||
|
||||
|
||||
numPartitions = nifiProperties.getFlowFileRepositoryPartitions();
|
||||
checkpointDelayMillis = FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
|
||||
|
||||
checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
@ -222,13 +219,13 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
.map(File::toPath)
|
||||
.collect(Collectors.toCollection(TreeSet::new));
|
||||
|
||||
wal = new MinimalLockingWriteAheadLog<>(paths, numPartitions, serdeFactory, this);
|
||||
wal = new MinimalLockingWriteAheadLog<>(paths, 1, serdeFactory, this);
|
||||
} else {
|
||||
throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property '" + WRITE_AHEAD_LOG_IMPL + "' has an invalid value of '" + walImplementation
|
||||
+ "'. Please update nifi.properties to indicate a valid value for this property.");
|
||||
}
|
||||
|
||||
logger.info("Initialized FlowFile Repository using {} partitions", numPartitions);
|
||||
logger.info("Initialized FlowFile Repository");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -247,7 +244,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(final Set<ResourceClaim> resourceClaims, final FlowFileSwapManager swapManager) throws IOException {
|
||||
public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(final Set<ResourceClaim> resourceClaims, final FlowFileSwapManager swapManager) {
|
||||
if (!(isSequentialAccessWAL(walImplementation))) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -56,8 +56,7 @@
|
|||
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
|
||||
<nifi.flowfile.repository.wal.implementation>org.apache.nifi.wali.SequentialAccessWriteAheadLog</nifi.flowfile.repository.wal.implementation>
|
||||
<nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory>
|
||||
<nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>
|
||||
<nifi.flowfile.repository.checkpoint.interval>2 mins</nifi.flowfile.repository.checkpoint.interval>
|
||||
<nifi.flowfile.repository.checkpoint.interval>20 secs</nifi.flowfile.repository.checkpoint.interval>
|
||||
<nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync>
|
||||
<nifi.flowfile.repository.encryption.key.provider.implementation />
|
||||
<nifi.flowfile.repository.encryption.key.provider.location />
|
||||
|
@ -65,16 +64,11 @@
|
|||
<nifi.flowfile.repository.encryption.key />
|
||||
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
|
||||
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
|
||||
<nifi.swap.in.period>5 sec</nifi.swap.in.period>
|
||||
<nifi.swap.in.threads>1</nifi.swap.in.threads>
|
||||
<nifi.swap.out.period>5 sec</nifi.swap.out.period>
|
||||
<nifi.swap.out.threads>4</nifi.swap.out.threads>
|
||||
|
||||
<nifi.content.repository.implementation>org.apache.nifi.controller.repository.FileSystemRepository</nifi.content.repository.implementation>
|
||||
<nifi.content.claim.max.appendable.size>1 MB</nifi.content.claim.max.appendable.size>
|
||||
<nifi.content.claim.max.flow.files>100</nifi.content.claim.max.flow.files>
|
||||
<nifi.content.repository.directory.default>./content_repository</nifi.content.repository.directory.default>
|
||||
<nifi.content.repository.archive.max.retention.period>12 hours</nifi.content.repository.archive.max.retention.period>
|
||||
<nifi.content.repository.archive.max.retention.period>7 days</nifi.content.repository.archive.max.retention.period>
|
||||
<nifi.content.repository.archive.max.usage.percentage>50%</nifi.content.repository.archive.max.usage.percentage>
|
||||
<nifi.content.repository.archive.enabled>true</nifi.content.repository.archive.enabled>
|
||||
<nifi.content.repository.always.sync>false</nifi.content.repository.always.sync>
|
||||
|
@ -103,7 +97,6 @@
|
|||
|
||||
<!-- persistent provenance repository properties -->
|
||||
<nifi.provenance.repository.implementation>org.apache.nifi.provenance.WriteAheadProvenanceRepository</nifi.provenance.repository.implementation>
|
||||
<nifi.provenance.repository.debug.frequency>1_000_000</nifi.provenance.repository.debug.frequency>
|
||||
<nifi.provenance.repository.encryption.key.provider.implementation />
|
||||
<nifi.provenance.repository.encryption.key.provider.location />
|
||||
<nifi.provenance.repository.encryption.key.id />
|
||||
|
@ -200,14 +193,14 @@
|
|||
<!-- nifi.properties: cluster load balance properties -->
|
||||
<nifi.cluster.load.balance.host />
|
||||
<nifi.cluster.load.balance.port>6342</nifi.cluster.load.balance.port>
|
||||
<nifi.cluster.load.balance.connections.per.node>4</nifi.cluster.load.balance.connections.per.node>
|
||||
<nifi.cluster.load.balance.connections.per.node>1</nifi.cluster.load.balance.connections.per.node>
|
||||
<nifi.cluster.load.balance.max.thread.count>8</nifi.cluster.load.balance.max.thread.count>
|
||||
<nifi.cluster.load.balance.comms.timeout>30 sec</nifi.cluster.load.balance.comms.timeout>
|
||||
|
||||
<!-- nifi.properties: zookeeper properties -->
|
||||
<nifi.zookeeper.connect.string />
|
||||
<nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
|
||||
<nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
|
||||
<nifi.zookeeper.connect.timeout>10 secs</nifi.zookeeper.connect.timeout>
|
||||
<nifi.zookeeper.session.timeout>10 secs</nifi.zookeeper.session.timeout>
|
||||
<nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
|
||||
<nifi.zookeeper.auth.type />
|
||||
<nifi.zookeeper.kerberos.removeHostFromPrincipal />
|
||||
|
|
|
@ -61,7 +61,6 @@ nifi.h2.url.append=${nifi.h2.url.append}
|
|||
nifi.flowfile.repository.implementation=${nifi.flowfile.repository.implementation}
|
||||
nifi.flowfile.repository.wal.implementation=${nifi.flowfile.repository.wal.implementation}
|
||||
nifi.flowfile.repository.directory=${nifi.flowfile.repository.directory}
|
||||
nifi.flowfile.repository.partitions=${nifi.flowfile.repository.partitions}
|
||||
nifi.flowfile.repository.checkpoint.interval=${nifi.flowfile.repository.checkpoint.interval}
|
||||
nifi.flowfile.repository.always.sync=${nifi.flowfile.repository.always.sync}
|
||||
nifi.flowfile.repository.encryption.key.provider.implementation=${nifi.flowfile.repository.encryption.key.provider.implementation}
|
||||
|
@ -71,15 +70,10 @@ nifi.flowfile.repository.encryption.key=${nifi.flowfile.repository.encryption.ke
|
|||
|
||||
nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
|
||||
nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
|
||||
nifi.swap.in.period=${nifi.swap.in.period}
|
||||
nifi.swap.in.threads=${nifi.swap.in.threads}
|
||||
nifi.swap.out.period=${nifi.swap.out.period}
|
||||
nifi.swap.out.threads=${nifi.swap.out.threads}
|
||||
|
||||
# Content Repository
|
||||
nifi.content.repository.implementation=${nifi.content.repository.implementation}
|
||||
nifi.content.claim.max.appendable.size=${nifi.content.claim.max.appendable.size}
|
||||
nifi.content.claim.max.flow.files=${nifi.content.claim.max.flow.files}
|
||||
nifi.content.repository.directory.default=${nifi.content.repository.directory.default}
|
||||
nifi.content.repository.archive.max.retention.period=${nifi.content.repository.archive.max.retention.period}
|
||||
nifi.content.repository.archive.max.usage.percentage=${nifi.content.repository.archive.max.usage.percentage}
|
||||
|
@ -93,7 +87,6 @@ nifi.content.repository.encryption.key=${nifi.content.repository.encryption.key}
|
|||
|
||||
# Provenance Repository Properties
|
||||
nifi.provenance.repository.implementation=${nifi.provenance.repository.implementation}
|
||||
nifi.provenance.repository.debug.frequency=${nifi.provenance.repository.debug.frequency}
|
||||
nifi.provenance.repository.encryption.key.provider.implementation=${nifi.provenance.repository.encryption.key.provider.implementation}
|
||||
nifi.provenance.repository.encryption.key.provider.location=${nifi.provenance.repository.encryption.key.provider.location}
|
||||
nifi.provenance.repository.encryption.key.id=${nifi.provenance.repository.encryption.key.id}
|
||||
|
|
Loading…
Reference in New Issue