diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 511707788d..eb93a62220 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -182,6 +182,49 @@ Antivirus software can take a long time to scan large directories and the numero
* `provenance_repository`
* `state`
+[[logging_configuration]]
+== Logging Configuration
+NiFi uses link:https://logback.qos.ch/[logback^] as the runtime logging implementation. The `conf` directory contains a
+standard `logback.xml` configuration with default appender and level settings. The
+link:https://logback.qos.ch/manual/index.html[logback manual] provides a complete reference of available options.
+
+=== Standard Log Files
+The standard logback configuration includes the following appender definitions and associated log files:
+
+[options="header"]
+|=========================
+| File | Description
+| `nifi-app.log` | Application log containing framework and component messages
+| `nifi-bootstrap.log` | Bootstrap log containing startup and shutdown messages
+| `nifi-deprecation.log` | Deprecation log containing warnings for deprecated components and features
+| `nifi-request.log` | HTTP request log containing user interface and REST API access messages
+| `nifi-user.log` | User log containing authentication and authorization messages
+|=========================
+
+=== Deprecation Logging
+The `nifi-deprecation.log` contains warning messages describing components and features that will be removed in
+subsequent versions. Deprecation warnings should be evaluated and addressed to avoid breaking changes when upgrading to
+a new major version. Resolving deprecation warnings involves upgrading to new components, changing component property
+settings, or refactoring custom component classes.
+
+Deprecation logging provides a method for checking compatibility before upgrading from one major release version to
+another. Upgrading to the latest minor release version will provide the most accurate set of deprecation warnings.
+
+It is important to note that deprecation logging applies to both components and features. Logging for deprecated
+features requires a runtime reference to the property or method impacted. Disabled components with deprecated properties
+or methods will not generate deprecation logs. For this reason, it is important to exercise all configured components
+long enough to exercise standard flow behavior.
+
+Deprecation logging can generate repeated messages depending on component configuration and usage patterns. Disabling
+deprecation logging for a specific component class can be configured by adding a `logger` element to `logback.xml`.
+The `name` attribute must start with `deprecation`, followed by the component class. Setting the `level` attribute to
+`OFF` disables deprecation logging for the component specified.
+
+[source, xml]
+----
+
+----
+
[[security_configuration]]
== Security Configuration
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 7451ff0987..fe301e41ae 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -2637,19 +2637,83 @@ logged to help avoid this bad practice.
[[deprecation]]
-== Deprecating a Component
+== Deprecating Components and Features
+Deprecating features is an important part of the software development lifecycle, providing an upgrade path for
+developers and users to follow.
+
+Apache NiFi follows the link:https://semver.org[Semantic Versioning Specification 2.0.0^] for features identified as
+part of the public version contract according to
+link:https://cwiki.apache.org/confluence/display/NIFI/Version+Scheme+and+API+Compatibility[Version Scheme]
+documentation.
+
+Components and features that fit under the public version contract require deprecation marking prior to removal. This
+approach allows developers to implement changes as part of minor version upgrades, in preparation for future removal
+of features in a subsequent major version.
+
+=== Component Deprecation
+
Sometimes it may be desirable to deprecate a component. Whenever this occurs the developer may use the
`@DeprecationNotice` annotation to indicate that a component has been deprecated, allowing the developer
- to describe a reason for the deprecation and suggest alternative components. An example of how to do this can
+ to describe a `reason` for the deprecation and suggest alternative components. An example of how to do this can
be found below:
[source, java]
----
- @DeprecationNotice(alternatives = {ListenSyslog.class}, classNames = {"org.apache.nifi.processors.standard.ListenRELP"}, reason = "Technology has been superseded", )
- public class ListenOldProtocol extends AbstractProcessor {
+@DeprecationNotice(
+ reason = "Legacy Protocol has been superseded",
+ alternatives = {ListenSyslog.class},
+ classNames = {"org.apache.nifi.processors.standard.ListenRELP"}
+)
+public class ListenLegacyProtocol extends AbstractProcessor {}
+----
+The `alternatives` property can be used to define an array of recommended replacement Components, while `classNames`
+can be used to represent similar content through an array of class name strings.
+
+Adding the `@DeprecationNotice` annotation renders a warning message in generated documentation and also logs the
+following warning when the Flow Configuration includes the component:
+
+----
+Added Deprecated Component ListenLegacyProtocol[id=929a52c7-1e3e-423e-b303-6ca2ef657617] See alternatives [ListenSyslog,ListenRELP]
+----
+
+=== Feature Deprecation
+
+Deprecating features includes changing component configuration strategies, introducing new repository classes, and
+refactoring a Controller Service interface. Removing component properties can create invalid Flow Configurations after
+upgrading, and removing public methods from a Controller Service interface can break components compiled against
+previous versions. For these reasons, introducing new properties and methods must include a deprecation strategy that
+supports compatibility when upgrading from one minor version to another.
+
+Annotating methods and properties with the Java `@Deprecated` annotation provides a warning to software developers, but
+does not provide any information to users. Adding selective deprecation logging provides a method for users to determine
+whether the current Flow Configuration uses deprecated properties or features.
+
+=== Deprecation Logging
+
+The `nifi-deprecation-log` module provides an abstraction for logging deprecation warnings. The `DeprecationLogger`
+interface has a `warn()` method that accepts a message string with a variable number of arguments. The standard
+implementation delegates to SLF4J, supporting standard features for providing and resolving placeholder arguments. The
+standard `DeprecationLogger` implementation also logs a `DeprecationException` with the referenced class, supporting
+tracing to the usage location.
+
+Instantiating a `DeprecationLogger` follows the same conventions as instantiating an SLF4J `Logger`:
+
+[source, java]
+----
+private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(Service.class);
+----
+
+Logging warnings for deprecated extension component properties and methods should include the simple class name and the
+component identifier for tracing. Warning messages should also reference recommended alternatives.
+
+[source, java]
+----
+deprecationLogger.warn(
+ "{}[id={}] createServiceContext() should be replaced with createContext()",
+ getClass().getSimpleName(),
+ getIdentifier()
+);
----
-As you can see, the alternatives can be used to define and array of alternative Components, while classNames can be
-used to represent the similar content through an array of strings.
[[build]]
== Build Options
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
index 5fff3a6f64..11d64aad89 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -214,8 +214,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
- RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os);
- RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound) : null;
+ RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, input);
+ RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound, input) : null;
Record record;
Relationship targetRelationship = REL_NOT_FOUND;
writer.beginRecordSet();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 36f79d5399..7eaeb6782f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -43,6 +43,10 @@
org.apache.nifi
nifi-nar-utils
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.apache.nifi
nifi-framework-components
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 8f729acffe..caf5d0e01f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.flow;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -24,6 +25,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -52,6 +54,8 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -66,6 +70,8 @@ import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.ParameterProvider;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.PublicPort;
@@ -73,6 +79,7 @@ import org.apache.nifi.remote.StandardPublicPort;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@@ -82,6 +89,7 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -338,8 +346,10 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
}
if (firstTimeAdded) {
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
+ final Processor processor = procNode.getProcessor();
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
+ logDeprecationNotice(processor);
} catch (final Exception e) {
if (registerLogObserver) {
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
@@ -385,17 +395,19 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
LogRepositoryFactory.getRepository(taskNode.getIdentifier()).setLogger(taskNode.getLogger());
if (firstTimeAdded) {
- final Class> taskClass = taskNode.getReportingTask().getClass();
- final String identifier = taskNode.getReportingTask().getIdentifier();
+ final ReportingTask reportingTask = taskNode.getReportingTask();
+ final Class extends ConfigurableComponent> taskClass = reportingTask.getClass();
+ final String identifier = reportingTask.getIdentifier();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, reportingTask);
+ logDeprecationNotice(reportingTask);
if (flowController.isInitialized()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, reportingTask, taskNode.getConfigurationContext());
}
} catch (final Exception e) {
- throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
+ throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + reportingTask, e);
}
}
@@ -440,17 +452,19 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
LogRepositoryFactory.getRepository(parameterProviderNode.getIdentifier()).setLogger(parameterProviderNode.getLogger());
if (firstTimeAdded) {
- final Class> taskClass = parameterProviderNode.getParameterProvider().getClass();
- final String identifier = parameterProviderNode.getParameterProvider().getIdentifier();
+ final ParameterProvider parameterProvider = parameterProviderNode.getParameterProvider();
+ final Class extends ConfigurableComponent> parameterProviderClass = parameterProvider.getClass();
+ final String identifier = parameterProvider.getIdentifier();
- try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, parameterProviderNode.getParameterProvider());
+ try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), parameterProviderClass, identifier)) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, parameterProvider);
+ logDeprecationNotice(parameterProvider);
if (flowController.isInitialized()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, parameterProviderNode.getParameterProvider());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, parameterProvider);
}
} catch (final Exception e) {
- throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + parameterProviderNode.getParameterProvider(), e);
+ throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + parameterProvider, e);
}
}
@@ -561,6 +575,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, serviceImpl.getClass(), serviceImpl.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, serviceImpl);
+ logDeprecationNotice(serviceImpl);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + serviceImpl, e);
}
@@ -586,5 +601,26 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
return flowController;
}
+ private void logDeprecationNotice(final ConfigurableComponent component) {
+ final Class extends ConfigurableComponent> componentClass = component.getClass();
+ final DeprecationNotice deprecationNotice = componentClass.getAnnotation(DeprecationNotice.class);
+ if (deprecationNotice != null) {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(componentClass);
+ final List alternatives = new ArrayList<>();
+
+ for (final Class extends ConfigurableComponent> alternativeClass : deprecationNotice.alternatives()) {
+ alternatives.add(alternativeClass.getSimpleName());
+ }
+ for (final String className : deprecationNotice.classNames()) {
+ alternatives.add(className);
+ }
+
+ deprecationLogger.warn("Added Deprecated Component {}[id={}] See alternatives {}",
+ componentClass.getSimpleName(),
+ component.getIdentifier(),
+ alternatives
+ );
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 9dacc0b024..d2b172ce36 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -27,6 +27,10 @@
org.apache.nifi
nifi-api
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.apache.nifi
nifi-utils
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java
index c873a75718..d2f3dda9c5 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/nar/hadoop/HDFSNarProvider.java
@@ -17,6 +17,8 @@
package org.apache.nifi.nar.hadoop;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
import org.apache.nifi.flow.resource.NarProviderAdapterInitializationContext;
import org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider;
@@ -31,9 +33,16 @@ import java.util.stream.Collectors;
@Deprecated
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class HDFSNarProvider extends HDFSExternalResourceProvider implements NarProvider {
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(HDFSNarProvider.class);
+
+ private static final String IMPLEMENTATION_PROPERTY = "nifi.nar.library.provider.hdfs.implementation";
@Override
public void initialize(final NarProviderInitializationContext context) {
+ deprecationLogger.warn("{} should be replaced with HDFSExternalResourceProvider for [{}] in nifi.properties",
+ getClass().getSimpleName(),
+ IMPLEMENTATION_PROPERTY
+ );
initialize(new NarProviderAdapterInitializationContext(context));
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 907a1412f4..cce39715ff 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -41,6 +41,8 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -227,6 +229,8 @@ public class ListHDFS extends AbstractHadoopProcessor {
.description("All FlowFiles are transferred to this relationship")
.build();
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(ListHDFS.class);
+
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L;
@@ -277,6 +281,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override
protected Collection customValidate(ValidationContext context) {
+ if (context.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet()) {
+ deprecationLogger.warn("{}[id={}] [{}] Property is not used",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ DISTRIBUTED_CACHE_SERVICE.getDisplayName()
+ );
+ }
+
final List problems = new ArrayList<>(super.customValidate(context));
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
index ec7ad3c7f0..efc4e64b86 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml
@@ -30,6 +30,10 @@
org.apache.nifi
nifi-framework-api
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.apache.nifi
nifi-data-provenance-utils
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 24f95f67f1..dd1b414329 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -31,6 +31,8 @@ import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.expiration.ExpirationAction;
@@ -146,6 +148,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private static final float ROLLOVER_HIGH_WATER = 0.99f;
private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(PersistentProvenanceRepository.class);
private final long maxPartitionMillis;
private final long maxPartitionBytes;
@@ -226,6 +229,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
public PersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException {
+ deprecationLogger.warn("{} should be replaced with WriteAheadProvenanceRepository for [{}] in nifi.properties",
+ getClass().getSimpleName(),
+ NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS
+ );
+
if (configuration.getStorageDirectories().isEmpty()) {
throw new IllegalArgumentException("Must specify at least one storage directory");
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/ITestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/ITestPersistentProvenanceRepository.java
deleted file mode 100644
index a3cc904dab..0000000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/ITestPersistentProvenanceRepository.java
+++ /dev/null
@@ -1,2063 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.core.SimpleAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.queryparser.classic.ParseException;
-import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.nifi.authorization.AccessDeniedException;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.provenance.lineage.EventNode;
-import org.apache.nifi.provenance.lineage.Lineage;
-import org.apache.nifi.provenance.lineage.LineageEdge;
-import org.apache.nifi.provenance.lineage.LineageNode;
-import org.apache.nifi.provenance.lineage.LineageNodeType;
-import org.apache.nifi.provenance.lucene.IndexingAction;
-import org.apache.nifi.provenance.search.Query;
-import org.apache.nifi.provenance.search.QueryResult;
-import org.apache.nifi.provenance.search.QuerySubmission;
-import org.apache.nifi.provenance.search.SearchTerms;
-import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.provenance.serialization.RecordWriters;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.file.FileUtils;
-
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPOutputStream;
-
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-
-@Timeout(value = 10)
-public class ITestPersistentProvenanceRepository {
-
- private PersistentProvenanceRepository repo;
- private static RepositoryConfiguration config;
-
- public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
- private EventReporter eventReporter;
- private final List reportedEvents = Collections.synchronizedList(new ArrayList<>());
-
- private static int headerSize;
- private static int recordSize;
- private static int recordSize2;
-
- private static RepositoryConfiguration createConfiguration() {
- config = new RepositoryConfiguration();
- config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID()));
- config.setCompressOnRollover(true);
- config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
- config.setCompressionBlockBytes(100);
- return config;
- }
-
- @BeforeAll
- public static void findJournalSizes() throws IOException {
- // determine header and record size
-
- final Map attributes = new HashMap<>();
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final ProvenanceEventRecord record = builder.build();
- builder.setComponentId("2345");
- final ProvenanceEventRecord record2 = builder.build();
-
- final File tempRecordFile = File.createTempFile("ProvenanceRepository", ".record");
- tempRecordFile.deleteOnExit();
- System.out.println("findJournalSizes position 0 = " + tempRecordFile.length());
-
- final AtomicLong idGenerator = new AtomicLong(0L);
- final RecordWriter writer = RecordWriters.newSchemaRecordWriter(tempRecordFile, idGenerator, false, false);
- writer.writeHeader(12345L);
- writer.flush();
- headerSize = Long.valueOf(tempRecordFile.length()).intValue();
- writer.writeRecord(record);
- writer.flush();
- recordSize = Long.valueOf(tempRecordFile.length()).intValue() - headerSize;
- writer.writeRecord(record2);
- writer.flush();
- recordSize2 = Long.valueOf(tempRecordFile.length()).intValue() - headerSize - recordSize;
- writer.close();
-
- System.out.println("headerSize =" + headerSize);
- System.out.println("recordSize =" + recordSize);
- System.out.println("recordSize2=" + recordSize2);
- }
-
- @BeforeEach
- public void printTestName() {
- reportedEvents.clear();
- eventReporter = new EventReporter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reportEvent(Severity severity, String category, String message) {
- reportedEvents.add(new ReportedEvent(severity, category, message));
- System.out.println(severity + " : " + category + " : " + message);
- }
- };
- }
-
- @AfterEach
- public void closeRepo() throws IOException {
- if (repo == null) {
- return;
- }
-
- try {
- repo.close();
- } catch (final IOException ioe) {
- }
-
- // Delete all of the storage files. We do this in order to clean up the tons of files that
- // we create but also to ensure that we have closed all of the file handles. If we leave any
- // streams open, for instance, this will throw an IOException, causing our unit test to fail.
- if (config != null) {
- for (final File storageDir : config.getStorageDirectories().values()) {
- int i;
- for (i = 0; i < 3; i++) {
- try {
- FileUtils.deleteFile(storageDir, true);
- break;
- } catch (final IOException ioe) {
- // if there is a virus scanner, etc. running in the background we may not be able to
- // delete the file. Wait a sec and try again.
- if (i == 2) {
- throw ioe;
- } else {
- try {
- System.out.println("file: " + storageDir + " exists=" + storageDir.exists());
- FileUtils.deleteFile(storageDir, true);
- break;
- } catch (final IOException ioe2) {
- // if there is a virus scanner, etc. running in the background we may not be able to
- // delete the file. Wait a sec and try again.
- if (i == 2) {
- throw ioe2;
- } else {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
- }
- }
- }
- }
- }
- }
- }
- }
-
- private EventReporter getEventReporter() {
- return eventReporter;
- }
-
- @Test
- @EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
- public void testPerformance() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1024 * 1024 * 1024L);
- config.setMaxEventFileLife(20, TimeUnit.SECONDS);
- config.setCompressOnRollover(false);
- config.setJournalCount(10);
- config.setQueryThreadPoolSize(10);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("uuid", UUID.randomUUID().toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final ProvenanceEventRecord record = builder.build();
-
- final Runnable r = new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 100000; i++) {
- repo.registerEvent(record);
- }
- }
- };
-
- final Thread[] threads = new Thread[10];
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(r);
- }
-
- final long start = System.nanoTime();
- for (final Thread t : threads) {
- t.start();
- }
-
- for (final Thread t : threads) {
- t.join();
- }
- final long nanos = System.nanoTime() - start;
-
- final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- final long recsPerMilli = 1000000 / millis;
- final long recsPerSec = recsPerMilli * 1000;
- System.out.println(millis + " millis to insert 1M records (" + recsPerSec + " recs/sec)");
-
- System.out.println("Closing and re-initializing");
- repo.close();
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
- System.out.println("Re-initialized");
-
- final long fetchStart = System.nanoTime();
- final List records = repo.getEvents(0L, 1000000);
- final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fetchStart);
- assertEquals(1000000, records.size());
- final long fetchRecsPerMilli = 1000000 / fetchMillis;
- final long fetchRecsPerSec = fetchRecsPerMilli * 1000L;
- System.out.println(fetchMillis + " millis to fetch 1M records (" + fetchRecsPerSec + " recs/sec)");
-
- repo.close();
- }
-
- private NiFiProperties properties = new NiFiProperties() {
- @Override
- public String getProperty(String key) {
- if (key.equals(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)) {
- return "true";
- } else if (key.equals(NiFiProperties.PROVENANCE_ROLLOVER_TIME)) {
- return "2000 millis";
- } else if (key.equals(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default")) {
- createConfiguration();
- return config.getStorageDirectories().values().iterator().next().getAbsolutePath();
- } else {
- return null;
- }
- }
-
- @Override
- public Set getPropertyKeys() {
- return new HashSet<>(Arrays.asList(
- NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER,
- NiFiProperties.PROVENANCE_ROLLOVER_TIME,
- NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + ".default"));
- }
- };
-
- @Test
- public void constructorNoArgs() {
- TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository();
- assertEquals(0, tppr.getRolloverCheckMillis());
- }
-
- @Test
- public void constructorNiFiProperties() throws IOException {
- TestablePersistentProvenanceRepository tppr = new TestablePersistentProvenanceRepository(properties);
- assertEquals(10000, tppr.getRolloverCheckMillis());
- }
-
- @Test
- public void constructorConfig() throws IOException {
- RepositoryConfiguration configuration = RepositoryConfiguration.create(properties);
- new TestablePersistentProvenanceRepository(configuration, 20000);
- }
-
- @Test
- public void testAddAndRecover() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L);
- config.setMaxEventFileLife(1, TimeUnit.SECONDS);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("uuid", UUID.randomUUID().toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final ProvenanceEventRecord record = builder.build();
-
- for (int i = 0; i < 10; i++) {
- repo.registerEvent(record);
- }
-
- Thread.sleep(1000L);
-
- repo.close();
- Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
- final List recoveredRecords = repo.getEvents(0L, 12);
-
- //just test however many were actually recovered since it is timing sensitive
- final int numRecovered = recoveredRecords.size();
- for (int i = 0; i < numRecovered; i++) {
- final ProvenanceEventRecord recovered = recoveredRecords.get(i);
- assertEquals(i, recovered.getEventId());
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
- assertEquals(attributes, recovered.getAttributes());
- }
- }
-
- @Test
- public void testAddToMultipleLogsAndRecover() throws IOException, InterruptedException {
- final List searchableFields = new ArrayList<>();
- searchableFields.add(SearchableFields.ComponentID);
-
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setMaxEventFileLife(2, TimeUnit.SECONDS);
- config.setSearchableFields(searchableFields);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("uuid", UUID.randomUUID().toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final ProvenanceEventRecord record = builder.build();
-
- for (int i = 0; i < 10; i++) {
- repo.registerEvent(record);
- }
-
- builder.setComponentId("XXXX"); // create a different component id so that we can make sure we query this record.
-
- attributes.put("uuid", "11111111-1111-1111-1111-111111111111");
-
- builder.fromFlowFile(createFlowFile(11L, 11L, attributes));
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
- Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
-
- // Create a new repo and add another record with component id XXXX so that we can ensure that it's added to a different
- // log file than the previous one.
- attributes.put("uuid", "22222222-2222-2222-2222-222222222222");
- builder.fromFlowFile(createFlowFile(11L, 11L, attributes));
- repo.registerEvent(builder.build());
- repo.waitForRollover();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "XXXX", null));
- query.setMaxResults(100);
-
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(2, result.getMatchingEvents().size());
- for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
- System.out.println(match);
- }
- }
-
- @Test
- public void testIndexOnRolloverWithImmenseAttribute() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setSearchableAttributes(SearchableFieldParser.extractSearchableFields("immense", false));
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- int immenseAttrSize = 33000; // must be greater than 32766 for a meaningful test
- StringBuilder immenseBldr = new StringBuilder(immenseAttrSize);
- for (int i = 0; i < immenseAttrSize; i++) {
- immenseBldr.append('0');
- }
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
- attributes.put("immense", immenseBldr.toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.newSearchableAttribute("immense"), "000*", null));
- query.setMaxResults(100);
-
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- }
-
- @Test
- public void testIndexOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "000000*", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
- query.setMaxResults(100);
-
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
- System.out.println(match);
- }
- }
-
- @Test
- public void testCompressOnRollover() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setCompressOnRollover(true);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
- final File storageDir = config.getStorageDirectories().values().iterator().next();
- final File compressedLogFile = new File(storageDir, "0.prov.gz");
- assertTrue(compressedLogFile.exists());
- }
-
- @Test
- public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(30, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L * 10);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L * 10);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "10000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final Query query = new Query(UUID.randomUUID().toString());
- // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
- query.setMaxResults(100);
-
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
- System.out.println(match);
- }
-
- Thread.sleep(2000L);
-
- config.setMaxStorageCapacity(100L);
- config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
- repo.purgeOldEvents();
- Thread.sleep(2000L);
-
- final QueryResult newRecordSet = repo.queryEvents(query, createUser());
- assertTrue(newRecordSet.getMatchingEvents().isEmpty());
- }
-
-
- @Test
- public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.addStorageDirectory("2", new File("target/storage/" + UUID.randomUUID()));
- config.setMaxRecordLife(30, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(1, TimeUnit.SECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- for (int j = 0; j < 3; j++) {
- attributes.put("iteration", String.valueOf(j));
-
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
-
- for (int i = 0; i < 10; i++) {
- String uuidSuffix = String.valueOf(i + j * 10);
- if (uuidSuffix.length() < 2) {
- uuidSuffix = "0" + uuidSuffix;
- }
-
- attributes.put("uuid", "00000000-0000-0000-0000-0000000000" + uuidSuffix);
- builder.fromFlowFile(createFlowFile(i + j * 10, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
- }
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
- query.setMaxResults(100);
-
- final QuerySubmission submission = repo.submitQuery(query, createUser());
- while (!submission.getResult().isFinished()) {
- Thread.sleep(100L);
- }
-
- assertEquals(30, submission.getResult().getMatchingEvents().size());
- final Map counts = new HashMap<>();
- for (final ProvenanceEventRecord match : submission.getResult().getMatchingEvents()) {
- System.out.println(match);
-
- final String index = match.getAttributes().get("iteration");
- Integer count = counts.get(index);
- if (count == null) {
- count = 0;
- }
- counts.put(index, count + 1);
- }
-
- assertEquals(3, counts.size());
- assertEquals(10, counts.get("0").intValue());
- assertEquals(10, counts.get("1").intValue());
- assertEquals(10, counts.get("2").intValue());
-
- config.setMaxRecordLife(1, TimeUnit.MILLISECONDS);
-
- repo.purgeOldEvents();
-
- Thread.sleep(2000L); // purge is async. Give it time to do its job.
-
- query.setMaxResults(100);
- final QuerySubmission noResultSubmission = repo.submitQuery(query, createUser());
- while (!noResultSubmission.getResult().isFinished()) {
- Thread.sleep(10L);
- }
-
- assertEquals(0, noResultSubmission.getResult().getTotalHitCount());
- }
-
- @Test
- public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(30, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- // Give time for rollover to happen
- repo.waitForRollover();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.setMaxResults(100);
-
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- for (final ProvenanceEventRecord match : result.getMatchingEvents()) {
- System.out.println(match);
- }
-
- Thread.sleep(2000L);
-
- config.setMaxStorageCapacity(100L);
- config.setMaxRecordLife(500, TimeUnit.MILLISECONDS);
- repo.purgeOldEvents();
-
- Thread.sleep(1000L);
-
- final QueryResult newRecordSet = repo.queryEvents(query, createUser());
- assertTrue(newRecordSet.getMatchingEvents().isEmpty());
- }
-
- @Test
- public void testLineageReceiveDrop() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000001";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("uuid", uuid);
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- repo.registerEvent(builder.build());
-
- builder.setEventTime(System.currentTimeMillis() + 1);
- builder.setEventType(ProvenanceEventType.DROP);
- builder.setTransitUri(null);
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
-
- final Lineage lineage = repo.computeLineage(uuid, createUser());
- assertNotNull(lineage);
-
- // Nodes should consist of a RECEIVE followed by FlowFileNode, followed by a DROP
- final List nodes = lineage.getNodes();
- final List edges = lineage.getEdges();
- assertEquals(3, nodes.size());
-
- for (final LineageEdge edge : edges) {
- if (edge.getSource().getNodeType() == LineageNodeType.FLOWFILE_NODE) {
- assertSame(edge.getDestination().getNodeType(), LineageNodeType.PROVENANCE_EVENT_NODE);
- assertSame(((EventNode) edge.getDestination()).getEventType(), ProvenanceEventType.DROP);
- } else {
- assertSame(((EventNode) edge.getSource()).getEventType(), ProvenanceEventType.RECEIVE);
- assertSame(edge.getDestination().getNodeType(), LineageNodeType.FLOWFILE_NODE);
- }
- }
- }
-
- @Test
- public void testLineageReceiveDropAsync() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000001";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("uuid", uuid);
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- repo.registerEvent(builder.build());
-
- builder.setEventTime(System.currentTimeMillis() + 1);
- builder.setEventType(ProvenanceEventType.DROP);
- builder.setTransitUri(null);
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
-
- final AsyncLineageSubmission submission = repo.submitLineageComputation(uuid, createUser());
- while (!submission.getResult().isFinished()) {
- Thread.sleep(100L);
- }
-
- assertNotNull(submission);
-
- // Nodes should consist of a RECEIVE followed by FlowFileNode, followed by a DROP
- final List nodes = submission.getResult().getNodes();
- final List edges = submission.getResult().getEdges();
- assertEquals(3, nodes.size());
-
- for (final LineageEdge edge : edges) {
- if (edge.getSource().getNodeType() == LineageNodeType.FLOWFILE_NODE) {
- assertSame(edge.getDestination().getNodeType(), LineageNodeType.PROVENANCE_EVENT_NODE);
- assertSame(((EventNode) edge.getDestination()).getEventType(), ProvenanceEventType.DROP);
- } else {
- assertSame(((EventNode) edge.getSource()).getEventType(), ProvenanceEventType.RECEIVE);
- assertSame(edge.getDestination().getNodeType(), LineageNodeType.FLOWFILE_NODE);
- }
- }
- }
-
- @Test
- public void testLineageManyToOneSpawn() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String childId = "00000000-0000-0000-0000-000000000000";
-
- final String parentId1 = "00000000-0000-0000-0001-000000000001";
- final String parentId2 = "00000000-0000-0000-0001-000000000002";
- final String parentId3 = "00000000-0000-0000-0001-000000000003";
-
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("uuid", childId);
- attributes.put("filename", "file-" + childId);
-
- final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.FORK);
- attributes.put("uuid", childId);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- builder.addChildUuid(childId);
- builder.addParentUuid(parentId1);
- builder.addParentUuid(parentId2);
- builder.addParentUuid(parentId3);
-
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
-
- final Lineage lineage = repo.computeLineage(childId, createUser());
- assertNotNull(lineage);
-
- // these are not necessarily accurate asserts....
- final List nodes = lineage.getNodes();
- final List edges = lineage.getEdges();
- assertEquals(2, nodes.size());
- assertEquals(1, edges.size());
- }
-
- @Test
- public void testLineageManyToOneSpawnAsync() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String childId = "00000000-0000-0000-0000-000000000000";
-
- final String parentId1 = "00000000-0000-0000-0001-000000000001";
- final String parentId2 = "00000000-0000-0000-0001-000000000002";
- final String parentId3 = "00000000-0000-0000-0001-000000000003";
-
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("uuid", childId);
- attributes.put("filename", "file-" + childId);
-
- final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.FORK);
- attributes.put("uuid", childId);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- builder.addChildUuid(childId);
- builder.addParentUuid(parentId1);
- builder.addParentUuid(parentId2);
- builder.addParentUuid(parentId3);
-
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
-
- final AsyncLineageSubmission submission = repo.submitLineageComputation(childId, createUser());
- while (!submission.getResult().isFinished()) {
- Thread.sleep(100L);
- }
-
- // these are not accurate asserts....
- final List nodes = submission.getResult().getNodes();
- final List edges = submission.getResult().getEdges();
- assertEquals(2, nodes.size());
- assertEquals(1, edges.size());
- }
-
- @Test
- public void testCorrectProvenanceEventIdOnRestore() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(1, TimeUnit.SECONDS);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- repo.close();
-
- final PersistentProvenanceRepository secondRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- secondRepo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- try {
- final ProvenanceEventRecord event11 = builder.build();
- secondRepo.registerEvent(event11);
- secondRepo.waitForRollover();
- final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L, null);
- assertNotNull(event11Retrieved);
- assertEquals(10, event11Retrieved.getEventId());
- } finally {
- secondRepo.close();
- }
- }
-
- /**
- * Here the event file is simply corrupted by virtue of not having any event
- * records while having correct headers
- */
- @Test
- public void testWithWithEventFileMissingRecord() throws Exception {
- File eventFile = this.prepCorruptedEventFileTests();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*", null));
- query.setMaxResults(100);
-
- DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
- in.writeUTF("BlahBlah");
- in.writeInt(4);
- in.close();
- assertTrue(eventFile.exists());
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- }
-
- /**
- * Here the event file is simply corrupted by virtue of being empty (0
- * bytes)
- */
- @Test
- public void testWithWithEventFileCorrupted() throws Exception {
- File eventFile = this.prepCorruptedEventFileTests();
-
- final Query query = new Query(UUID.randomUUID().toString());
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "foo-*", null));
- query.setMaxResults(100);
- DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
- in.close();
- final QueryResult result = repo.queryEvents(query, createUser());
- assertEquals(10, result.getMatchingEvents().size());
- }
-
- private File prepCorruptedEventFileTests() throws Exception {
- RepositoryConfiguration config = createConfiguration();
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10);
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- String uuid = UUID.randomUUID().toString();
- for (int i = 0; i < 20; i++) {
- ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
- .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
- .setFlowFileUUID(uuid).build();
- repo.registerEvent(record);
- if (i == 9) {
- repo.waitForRollover();
- Thread.sleep(2000L);
- }
- }
- repo.waitForRollover();
- File eventFile = new File(config.getStorageDirectories().values().iterator().next(), "10.prov.gz");
- assertTrue(eventFile.delete());
- return eventFile;
- }
-
- @Test
- public void testNotAuthorizedGetSpecificEvent() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(5, TimeUnit.MINUTES);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10); // force new index to be created for each rollover
-
- final AccessDeniedException expectedException = new AccessDeniedException("Unit Test - Intentionally Thrown");
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- public void authorize(ProvenanceEventRecord event, NiFiUser user) {
- throw expectedException;
- }
- };
-
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- builder.setEventTime(10L); // make sure the events are destroyed when we call purge
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- assertThrows(expectedException.getClass(), () -> repo.getEvent(0, null));
- }
-
- @Test
- public void testNotAuthorizedGetEventRange() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(5, TimeUnit.MINUTES);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10); // force new index to be created for each rollover
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) {
- return event.getEventId() > 2;
- }
- };
-
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- builder.setEventTime(10L); // make sure the events are destroyed when we call purge
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final List events = repo.getEvents(0L, 10, null);
-
- // Ensure that we gets events with ID's 3 through 10.
- assertEquals(7, events.size());
- final List eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList());
- for (int i = 0; i < 7; i++) {
- assertEquals(i + 3, eventIds.get(i).intValue());
- }
- }
-
- @Test
- public void testNotAuthorizedQuery() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(5, TimeUnit.MINUTES);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10); // force new index to be created for each rollover
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) {
- return event.getEventId() > 2;
- }
- };
-
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- builder.setEventTime(10L); // make sure the events are destroyed when we call purge
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final Query query = new Query("1234");
- query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234", null));
- final QuerySubmission submission = repo.submitQuery(query, createUser());
-
- final QueryResult result = submission.getResult();
- while (!result.isFinished()) {
- Thread.sleep(100L);
- }
-
- // Ensure that we gets events with ID's 3 through 10.
- final List events = result.getMatchingEvents();
- assertEquals(7, events.size());
- final List eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList());
- for (int i = 0; i < 7; i++) {
- assertEquals(i + 3, eventIds.get(i).intValue());
- }
- }
-
- @Test
- public void testNotAuthorizedLineage() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(5, TimeUnit.MINUTES);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10); // force new index to be created for each rollover
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- public boolean isAuthorized(ProvenanceEventRecord event, NiFiUser user) {
- return event.getEventType() != ProvenanceEventType.ATTRIBUTES_MODIFIED;
- }
- };
-
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
- attributes.put("uuid", uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- builder.setEventTime(10L); // make sure the events are destroyed when we call purge
-
- builder.fromFlowFile(createFlowFile(1, 3000L, attributes));
- repo.registerEvent(builder.build());
-
- builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED);
- builder.fromFlowFile(createFlowFile(2, 2000L, attributes));
- repo.registerEvent(builder.build());
-
- builder.setEventType(ProvenanceEventType.CONTENT_MODIFIED);
- builder.fromFlowFile(createFlowFile(3, 2000L, attributes));
- repo.registerEvent(builder.build());
-
- builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED);
- attributes.put("new-attr", "yes");
- builder.fromFlowFile(createFlowFile(4, 2000L, attributes));
- repo.registerEvent(builder.build());
-
- final Map childAttributes = new HashMap<>(attributes);
- childAttributes.put("uuid", "00000000-0000-0000-0000-000000000001");
- builder.setEventType(ProvenanceEventType.FORK);
- builder.fromFlowFile(createFlowFile(4, 2000L, attributes));
- builder.addChildFlowFile(createFlowFile(5, 2000L, childAttributes));
- builder.addParentFlowFile(createFlowFile(4, 2000L, attributes));
- repo.registerEvent(builder.build());
-
- builder.setEventType(ProvenanceEventType.ATTRIBUTES_MODIFIED);
- builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes));
- repo.registerEvent(builder.build());
-
- builder.setEventType(ProvenanceEventType.DROP);
- builder.fromFlowFile(createFlowFile(6, 2000L, childAttributes));
- repo.registerEvent(builder.build());
-
- repo.waitForRollover();
-
- final AsyncLineageSubmission originalLineage = repo.submitLineageComputation(uuid, createUser());
-
- final StandardLineageResult result = originalLineage.getResult();
- while (!result.isFinished()) {
- Thread.sleep(100L);
- }
-
- final List lineageNodes = result.getNodes();
- assertEquals(6, lineageNodes.size());
-
- assertEquals(1, lineageNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count());
- assertEquals(5, lineageNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.PROVENANCE_EVENT_NODE).count());
-
- final Set eventNodes = lineageNodes.stream()
- .filter(node -> node.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE)
- .map(node -> (EventNode) node)
- .collect(Collectors.toSet());
-
- final Map> nodesByType = eventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType));
- assertEquals(1, nodesByType.get(ProvenanceEventType.RECEIVE).size());
- assertEquals(2, nodesByType.get(ProvenanceEventType.CONTENT_MODIFIED).size());
- assertEquals(1, nodesByType.get(ProvenanceEventType.FORK).size());
-
- assertEquals(1, nodesByType.get(ProvenanceEventType.UNKNOWN).size());
- assertNull(nodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED));
-
- // Test filtering on expandChildren
- final AsyncLineageSubmission expandChild = repo.submitExpandChildren(4L, createUser());
- final StandardLineageResult expandChildResult = expandChild.getResult();
- while (!expandChildResult.isFinished()) {
- Thread.sleep(100L);
- }
-
- final List expandChildNodes = expandChildResult.getNodes();
- assertEquals(4, expandChildNodes.size());
-
- assertEquals(1, expandChildNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.FLOWFILE_NODE).count());
- assertEquals(3, expandChildNodes.stream().map(node -> node.getNodeType()).filter(t -> t == LineageNodeType.PROVENANCE_EVENT_NODE).count());
-
- final Set childEventNodes = expandChildNodes.stream()
- .filter(node -> node.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE)
- .map(node -> (EventNode) node)
- .collect(Collectors.toSet());
-
- final Map> childNodesByType = childEventNodes.stream().collect(Collectors.groupingBy(EventNode::getEventType));
- assertEquals(1, childNodesByType.get(ProvenanceEventType.FORK).size());
- assertEquals(1, childNodesByType.get(ProvenanceEventType.DROP).size());
- assertEquals(1, childNodesByType.get(ProvenanceEventType.UNKNOWN).size());
- assertNull(childNodesByType.get(ProvenanceEventType.ATTRIBUTES_MODIFIED));
- }
-
- @Test
- public void testBackPressure() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L); // force rollover on each record.
- config.setJournalCount(1);
-
- final AtomicInteger journalCountRef = new AtomicInteger(0);
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- protected int getJournalCount() {
- return journalCountRef.get();
- }
- };
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", UUID.randomUUID().toString());
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- // ensure that we can register the events.
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- // set number of journals to 6 so that we will block.
- journalCountRef.set(6);
-
- final AtomicLong threadNanos = new AtomicLong(0L);
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- final long start = System.nanoTime();
- builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
- repo.registerEvent(builder.build());
- threadNanos.set(System.nanoTime() - start);
- }
- });
- t.start();
-
- Thread.sleep(1500L);
-
- journalCountRef.set(1);
- t.join();
-
- final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
- assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
-
- builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
- repo.registerEvent(builder.build());
-
- Thread.sleep(3000L);
- }
-
- @Test
- public void testTextualQuery() throws InterruptedException, IOException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-unnamed");
-
- final long now = System.currentTimeMillis();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(now - TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS));
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- if (i > 5) {
- attributes.put("filename", "file-" + i);
- builder.setEventTime(System.currentTimeMillis());
- }
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
-
- final IndexConfiguration indexConfig = new IndexConfiguration(config);
- final List indexDirs = indexConfig.getIndexDirectories();
-
- final String query = "uuid:00000000-0000-0000-0000-0000000000* AND NOT filename:file-?";
- final List results = runQuery(indexDirs.get(0), new ArrayList<>(config.getStorageDirectories().values()), query);
-
- assertEquals(6, results.size());
- }
-
- private List runQuery(final File indexDirectory, final List storageDirs, final String query) throws IOException, ParseException {
- try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory.toPath()))) {
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- final Analyzer analyzer = new SimpleAnalyzer();
- final org.apache.lucene.search.Query luceneQuery = new QueryParser("uuid", analyzer).parse(query);
-
- final Query q = new Query("");
- q.setMaxResults(1000);
- final TopDocs topDocs = searcher.search(luceneQuery, 1000);
-
- final List docs = new ArrayList<>();
- for (final ScoreDoc scoreDoc : topDocs.scoreDocs) {
- final int docId = scoreDoc.doc;
- final Document d = directoryReader.document(docId);
- docs.add(d);
- }
-
- return docs;
- }
- }
-
- private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
- File[] storagefiles = storageDir.listFiles();
- long counter = 0;
- assertNotNull(storagefiles);
- for (final File file : storagefiles) {
- if (file.isFile()) {
- try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
- ProvenanceEventRecord r;
- ProvenanceEventRecord last = null;
- while ((r = reader.nextRecord()) != null) {
- if (exact) {
- assertEquals(counter++, r.getEventId());
- } else {
- assertTrue(counter++ <= r.getEventId());
- }
- }
- }
- }
- }
- return counter;
- }
-
- @Test
- public void testMergeJournals() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 10000; i++) {
- exec.submit(new Runnable() {
- @Override
- public void run() {
- repo.registerEvent(record);
- }
- });
- }
-
- repo.waitForRollover();
-
- final File storageDir = config.getStorageDirectories().values().iterator().next();
- long counter = 0;
- for (final File file : storageDir.listFiles()) {
- if (file.isFile()) {
-
- try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
- ProvenanceEventRecord r = null;
-
- while ((r = reader.nextRecord()) != null) {
- assertEquals(counter++, r.getEventId());
- }
- }
- }
- }
-
- assertEquals(10000, counter);
- }
-
- private void corruptJournalFile(final File journalFile, final int position,
- final String original, final String replacement) throws IOException {
- final int journalLength = Long.valueOf(journalFile.length()).intValue();
- final byte[] origBytes = original.getBytes();
- final byte[] replBytes = replacement.getBytes();
- FileInputStream journalIn = new FileInputStream(journalFile);
- byte[] content = new byte[journalLength];
- assertEquals(journalLength, journalIn.read(content, 0, journalLength));
- journalIn.close();
- assertEquals(original, new String(Arrays.copyOfRange(content, position, position + origBytes.length)));
- System.arraycopy(replBytes, 0, content, position, replBytes.length);
- FileOutputStream journalOut = new FileOutputStream(journalFile);
- journalOut.write(content, 0, journalLength);
- journalOut.flush();
- journalOut.close();
- }
-
- @Test
- public void testMergeJournalsBadFirstRecord() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- testRepo.initialize(getEventReporter(), null, null, null);
-
- final Map attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- final List futures = new ArrayList<>();
- for (int i = 0; i < 10000; i++) {
- futures.add(exec.submit(new Runnable() {
- @Override
- public void run() {
- testRepo.registerEvent(record);
- }
- }));
- }
-
- // wait for writers to finish and then corrupt the first record of the first journal file
- for (Future future : futures) {
- while (!future.isDone()) {
- Thread.sleep(10);
- }
- }
- RecordWriter firstWriter = testRepo.getWriters()[0];
- corruptJournalFile(firstWriter.getFile(), headerSize + 15, "RECEIVE", "BADTYPE");
-
- testRepo.recoverJournalFiles();
-
- final File storageDir = config.getStorageDirectories().values().iterator().next();
- assertTrue(checkJournalRecords(storageDir, false) < 10000);
- }
-
- @Test
- public void testMergeJournalsBadRecordAfterFirst() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- testRepo.initialize(getEventReporter(), null, null, null);
-
- final Map attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- final List> futures = new ArrayList<>();
- for (int i = 0; i < 10000; i++) {
- futures.add(exec.submit(new Runnable() {
- @Override
- public void run() {
- testRepo.registerEvent(record);
- }
- }));
- }
-
- // corrupt the first record of the first journal file
- for (Future> future : futures) {
- while (!future.isDone()) {
- Thread.sleep(10);
- }
- }
- RecordWriter firstWriter = testRepo.getWriters()[0];
- corruptJournalFile(firstWriter.getFile(), headerSize + 15 + recordSize, "RECEIVE", "BADTYPE");
-
- testRepo.recoverJournalFiles();
-
- final File storageDir = config.getStorageDirectories().values().iterator().next();
- assertTrue(checkJournalRecords(storageDir, false) < 10000);
- }
-
- private boolean isWindowsEnvironment() {
- return System.getProperty("os.name").toLowerCase().startsWith("windows");
- }
-
- @Test
- public void testMergeJournalsEmptyJournal() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- testRepo.initialize(getEventReporter(), null, null, null);
-
- final Map attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- final List futures = new ArrayList<>();
- for (int i = 0; i < config.getJournalCount() - 1; i++) {
- futures.add(exec.submit(new Runnable() {
- @Override
- public void run() {
- testRepo.registerEvent(record);
- }
- }));
- }
-
- // wait for writers to finish and then corrupt the first record of the first journal file
- for (Future future : futures) {
- while (!future.isDone()) {
- Thread.sleep(10);
- }
- }
-
- testRepo.recoverJournalFiles();
-
- assertEquals(0, reportedEvents.size(),"mergeJournals() should not error on empty journal");
-
- final File storageDir = config.getStorageDirectories().values().iterator().next();
- assertEquals(config.getJournalCount() - 1, checkJournalRecords(storageDir, true));
- }
-
- @Test
- public void testRolloverRetry() throws IOException, InterruptedException {
- final AtomicInteger retryAmount = new AtomicInteger(0);
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
-
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- File mergeJournals(List journalFiles, File suggestedMergeFile, EventReporter eventReporter) throws IOException {
- retryAmount.incrementAndGet();
- return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter);
- }
-
- // Indicate that there are no files available.
- @Override
- protected List filterUnavailableFiles(List journalFiles) {
- return Collections.emptyList();
- }
-
- @Override
- protected long getRolloverRetryMillis() {
- return 10L; // retry quickly.
- }
- };
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 10000; i++) {
- exec.submit(new Runnable() {
- @Override
- public void run() {
- repo.registerEvent(record);
- }
- });
- }
- exec.shutdown();
- exec.awaitTermination(10, TimeUnit.SECONDS);
-
- repo.waitForRollover();
- assertEquals(5, retryAmount.get());
- }
-
- @Test
- public void testTruncateAttributes() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxAttributeChars(50);
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final String maxLengthChars = "12345678901234567890123456789012345678901234567890";
- final Map attributes = new HashMap<>();
- attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
- attributes.put("51chars", "123456789012345678901234567890123456789012345678901");
- attributes.put("50chars", "12345678901234567890123456789012345678901234567890");
- attributes.put("49chars", "1234567890123456789012345678901234567890123456789");
- attributes.put("nullChar", null);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
- repo.registerEvent(record);
- repo.waitForRollover();
-
- final ProvenanceEventRecord retrieved = repo.getEvent(0L, null);
- assertNotNull(retrieved);
- assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
- assertEquals(maxLengthChars, retrieved.getAttributes().get("75chars"));
- assertEquals(maxLengthChars, retrieved.getAttributes().get("51chars"));
- assertEquals(maxLengthChars, retrieved.getAttributes().get("50chars"));
- assertEquals(maxLengthChars.substring(0, 49), retrieved.getAttributes().get("49chars"));
- }
-
- @Test
- public void testExceptionOnIndex() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxAttributeChars(50);
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- config.setIndexThreadPoolSize(1);
-
- final int numEventsToIndex = 10;
-
- final AtomicInteger indexedEventCount = new AtomicInteger(0);
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- protected synchronized IndexingAction createIndexingAction() {
- return new IndexingAction(config.getSearchableFields(), config.getSearchableAttributes()) {
- @Override
- public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException {
- final int count = indexedEventCount.incrementAndGet();
- if (count <= numEventsToIndex) {
- return;
- }
-
- throw new IOException("Unit Test - Intentional Exception");
- }
- };
- }
- };
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- final Map attributes = new HashMap<>();
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 1000; i++) {
- final ProvenanceEventRecord record = builder.build();
- repo.registerEvent(record);
- }
-
- repo.waitForRollover();
-
- assertEquals(numEventsToIndex + PersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get());
- assertEquals(1, reportedEvents.size());
- final ReportedEvent event = reportedEvents.get(0);
- assertEquals(Severity.WARNING, event.getSeverity());
- }
-
- @Test
- public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxAttributeChars(50);
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
-
- // Create a repo that will allow only a single writer to be created.
- final IOException failure = new IOException("Already created writers once. Unit test causing failure.");
- repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- int iterations = 0;
-
- @Override
- protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
- if (iterations++ == 1) {
- throw failure;
- } else {
- return super.createWriters(config, initialRecordId);
- }
- }
- };
-
- // initialize with our event reporter
- repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
-
- // create some events in the journal files.
- final Map attributes = new HashMap<>();
- attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 50; i++) {
- final ProvenanceEventRecord event = builder.build();
- repo.registerEvent(event);
- }
-
- // Attempt to rollover but fail to create new writers.
- assertThrows(IOException.class, () -> repo.rolloverWithLock(true));
-
- // Wait for the first rollover to succeed.
- repo.waitForRollover();
-
- // This time when we rollover, we should not have a problem rolling over.
- repo.rolloverWithLock(true);
-
- // Ensure that no errors were reported.
- assertEquals(0, reportedEvents.size());
- }
-
- private static class ReportedEvent {
-
- private final Severity severity;
- private final String category;
- private final String message;
-
- public ReportedEvent(final Severity severity, final String category, final String message) {
- this.severity = severity;
- this.category = category;
- this.message = message;
- }
-
- @SuppressWarnings("unused")
- public String getCategory() {
- return category;
- }
-
- @SuppressWarnings("unused")
- public String getMessage() {
- return message;
- }
-
- public Severity getSeverity() {
- return severity;
- }
- }
-
- private NiFiUser createUser() {
- return new NiFiUser() {
- @Override
- public String getIdentity() {
- return "unit-test";
- }
-
- @Override
- public Set getGroups() {
- return Collections.EMPTY_SET;
- }
-
- @Override
- public Set getIdentityProviderGroups() {
- return Collections.EMPTY_SET;
- }
-
- @Override
- public Set getAllGroups() {
- return Collections.EMPTY_SET;
- }
-
- @Override
- public NiFiUser getChain() {
- return null;
- }
-
- @Override
- public boolean isAnonymous() {
- return false;
- }
-
- @Override
- public String getClientAddress() {
- return null;
- }
-
- };
- }
-
- private static class TestablePersistentProvenanceRepository extends PersistentProvenanceRepository {
-
- TestablePersistentProvenanceRepository() {
- super();
- }
-
- TestablePersistentProvenanceRepository(final NiFiProperties nifiProperties) throws IOException {
- super(nifiProperties);
- }
-
- TestablePersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException {
- super(configuration, rolloverCheckMillis);
- }
-
- RecordWriter[] getWriters() {
- Class klass = PersistentProvenanceRepository.class;
- Field writersField;
- RecordWriter[] writers = null;
- try {
- writersField = klass.getDeclaredField("writers");
- writersField.setAccessible(true);
- writers = (RecordWriter[]) writersField.get(this);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- e.printStackTrace();
- }
- return writers;
- }
-
- int getRolloverCheckMillis() {
- Class klass = PersistentProvenanceRepository.class;
- java.lang.reflect.Field rolloverCheckMillisField;
- int rolloverCheckMillis = -1;
- try {
- rolloverCheckMillisField = klass.getDeclaredField("rolloverCheckMillis");
- rolloverCheckMillisField.setAccessible(true);
- rolloverCheckMillis = (int) rolloverCheckMillisField.get(this);
- } catch (NoSuchFieldException | IllegalAccessException e) {
- e.printStackTrace();
- }
- return rolloverCheckMillis;
- }
-
- }
-
- private RepositoryConfiguration createTestableRepositoryConfiguration(final NiFiProperties properties) {
- Class klass = PersistentProvenanceRepository.class;
- Method createRepositoryConfigurationMethod;
- RepositoryConfiguration configuration = null;
- try {
- createRepositoryConfigurationMethod = klass.getDeclaredMethod("createRepositoryConfiguration", NiFiProperties.class);
- createRepositoryConfigurationMethod.setAccessible(true);
- configuration = (RepositoryConfiguration) createRepositoryConfigurationMethod.invoke(null, properties);
- } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
- e.printStackTrace();
- }
- return configuration;
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
deleted file mode 100644
index c4660d531c..0000000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.provenance.toc.NopTocWriter;
-import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
-
-@SuppressWarnings("deprecation")
-public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter {
- private final AtomicLong idGenerator = new AtomicLong(0L);
-
- @BeforeEach
- public void resetIds() {
- idGenerator.set(0L);
- }
-
- @Test
- @EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
- public void testWritePerformance() throws IOException {
- // This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
- // making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
- // on performance of the repository.
- final ProvenanceEventRecord event = createEvent();
-
- final TocWriter tocWriter = new NopTocWriter();
-
- final int numEvents = 10_000_000;
- final long startNanos = System.nanoTime();
- try (final OutputStream nullOut = new NullOutputStream();
- final RecordWriter writer = new StandardRecordWriter(nullOut, "devnull", idGenerator, tocWriter, false, 100000)) {
-
- writer.writeHeader(0L);
-
- for (int i = 0; i < numEvents; i++) {
- writer.writeRecord(event);
- }
- }
-
- final long nanos = System.nanoTime() - startNanos;
- final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- System.out.println("Took " + millis + " millis to write " + numEvents + " events");
- }
-
- @Test
- @EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
- public void testReadPerformance() throws IOException {
- // This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
- // making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
- // on performance of the repository.
- final ProvenanceEventRecord event = createEvent();
-
- final byte[] header;
- try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(headerOut)) {
- out.writeUTF(PersistentProvenanceRepository.class.getName());
- out.writeInt(9);
- header = headerOut.toByteArray();
- }
-
- final byte[] serializedRecord;
- try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
- final StandardRecordWriter writer = new StandardRecordWriter(headerOut, "devnull", idGenerator, null, false, 0)) {
-
- writer.writeHeader(1L);
- headerOut.reset();
-
- writer.writeRecord(event);
- writer.flush();
- serializedRecord = headerOut.toByteArray();
- }
-
- final int numEvents = 10_000_000;
- final long startNanos = System.nanoTime();
- try (final InputStream in = new LoopingInputStream(header, serializedRecord);
- final RecordReader reader = new StandardRecordReader(in, "filename", null, 100000)) {
-
- for (int i = 0; i < numEvents; i++) {
- reader.nextRecord();
- }
- }
-
- final long nanos = System.nanoTime() - startNanos;
- final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
- System.out.println("Took " + millis + " millis to read " + numEvents + " events");
- }
-
- @Test
- public void testWriteUtfLargerThan64k() throws IOException {
-
- final Map attributes = new HashMap<>();
- attributes.put("filename", "1.txt");
- attributes.put("uuid", UUID.randomUUID().toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final String seventyK = StringUtils.repeat("X", 70000);
- assertTrue(seventyK.length() > 65535);
- assertTrue(seventyK.getBytes(StandardCharsets.UTF_8).length > 65535);
- builder.setDetails(seventyK);
- final ProvenanceEventRecord record = builder.build();
-
- try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(headerOut)) {
- out.writeUTF(PersistentProvenanceRepository.class.getName());
- out.writeInt(9);
- }
-
- try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream();
- final StandardRecordWriter writer = new StandardRecordWriter(recordOut, "devnull", idGenerator, null, false, 0)) {
-
- writer.writeHeader(1L);
- recordOut.reset();
-
- writer.writeRecord(record);
- }
- }
-
- @Override
- protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException {
- return new StandardRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize);
- }
-
- @Override
- protected RecordReader createReader(InputStream in, String journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException {
- return new StandardRecordReader(in, journalFilename, tocReader, maxAttributeSize);
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml
index 1d1aeb8984..a589dabdd5 100644
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml
+++ b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/pom.xml
@@ -43,6 +43,10 @@
1.18.0-SNAPSHOT
provided
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.rocksdb
rocksdbjni
diff --git a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
index b28783e709..ede2429152 100644
--- a/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-rocksdb-bundle/nifi-rocksdb-repository/src/main/java/org/apache/nifi/controller/repository/RocksDBFlowFileRepository.java
@@ -21,6 +21,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -78,6 +80,7 @@ import java.util.stream.Collectors;
public class RocksDBFlowFileRepository implements FlowFileRepository {
private static final Logger logger = LoggerFactory.getLogger(RocksDBFlowFileRepository.class);
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(RocksDBFlowFileRepository.class);
private static final String FLOWFILE_PROPERTY_PREFIX = "nifi.flowfile.repository.";
private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = FLOWFILE_PROPERTY_PREFIX + "directory";
@@ -286,7 +289,10 @@ public class RocksDBFlowFileRepository implements FlowFileRepository {
}
public RocksDBFlowFileRepository(final NiFiProperties niFiProperties) {
- logger.warn("*** " + RocksDBFlowFileRepository.class.getSimpleName() + " is deprecated and will be removed in future versions of Apache NiFi. ***");
+ deprecationLogger.warn("{} should be replaced with WriteAheadFlowFileRepository for [{}] in nifi.properties",
+ getClass().getSimpleName(),
+ NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION
+ );
deserializationThreads = RocksDbProperty.DESERIALIZATION_THREADS.getIntValue(niFiProperties);
deserializationBufferSize = RocksDbProperty.DESERIALIZATION_BUFFER_SIZE.getIntValue(niFiProperties);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 1b3122cb65..0ad4952d62 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -34,6 +34,10 @@
org.apache.nifi
nifi-expression-language
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.apache.nifi
nifi-put-pattern
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index 4f25e44aec..47c6037b20 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -95,6 +95,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -630,6 +632,8 @@ public class InvokeHTTP extends AbstractProcessor {
private static final String MULTIPLE_HEADER_DELIMITER = ", ";
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(InvokeHTTP.class);
+
private volatile Set dynamicPropertyNames = new HashSet<>();
private volatile Pattern requestHeaderAttributesPattern = null;
@@ -704,6 +708,15 @@ public class InvokeHTTP extends AbstractProcessor {
protected Collection customValidate(final ValidationContext validationContext) {
final List results = new ArrayList<>(3);
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
+ if (proxyHostSet) {
+ deprecationLogger.warn("{}[id={}] [{}] Property should be replaced with [{}] Property",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ PROXY_HOST.getDisplayName(),
+ PROXY_CONFIGURATION_SERVICE.getDisplayName()
+ );
+ }
+
final boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
index e107fa6856..d166a53172 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
@@ -32,5 +32,9 @@
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-deprecation-log
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index 3d16f264d2..7355cb9878 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -50,7 +52,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
*
*/
public interface RecordSetWriterFactory extends ControllerService {
-
/**
*
* Returns the Schema that will be used for writing Records. The given variables are
@@ -82,6 +83,15 @@ public interface RecordSetWriterFactory extends ControllerService {
*/
@Deprecated
default RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
+ final String deprecatedMethod = "createWriter(ComponentLog, RecordSchema, OutputStream)";
+ final String replacementMethod = "createWriter(ComponentLog, RecordSchema, OutputStream, FlowFile)";
+ deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ deprecatedMethod,
+ replacementMethod
+ );
return createWriter(logger, schema, out, Collections.emptyMap());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
index f298bda246..e108f421fc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
@@ -31,5 +31,9 @@
org.apache.nifi
nifi-record
+
+ org.apache.nifi
+ nifi-deprecation-log
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
index e785a70a6d..ceb3211bd7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
@@ -17,6 +17,8 @@
package org.apache.nifi.schemaregistry.services;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
@@ -43,6 +45,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
default String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
+ final String deprecatedMethod = "retrieveSchemaText(schemaName)";
+ final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
+ deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ deprecatedMethod,
+ replacementMethod
+ );
+
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
@@ -63,6 +75,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
default String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
+ final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
+ final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
+ deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ deprecatedMethod,
+ replacementMethod
+ );
+
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
@@ -82,6 +104,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
default RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
+ final String deprecatedMethod = "retrieveSchemaText(schemaName)";
+ final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
+ deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ deprecatedMethod,
+ replacementMethod
+ );
+
return retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
}
@@ -104,6 +136,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
default RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
+ final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
+ final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
+ final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
+ deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
+ getClass().getSimpleName(),
+ getIdentifier(),
+ deprecatedMethod,
+ replacementMethod
+ );
+
return retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/pom.xml
index 047db20460..335d4c51b1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/pom.xml
@@ -35,6 +35,10 @@
nifi-security-utils
1.18.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-deprecation-log
+
org.apache.nifi
nifi-mock
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
index d4d2f6cd85..653df19700 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java
@@ -30,6 +30,8 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.deprecation.log.DeprecationLogger;
+import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@@ -128,6 +130,8 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.sensitive(false)
.build();
+ private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(StandardSSLContextService.class);
+
private static final List properties;
protected ConfigurationContext configContext;
private boolean isValidated;
@@ -269,6 +273,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
@Deprecated
@Override
public SSLContext createSSLContext(final org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {
+ deprecationLogger.warn("{}[id={}] createSSLContext() should be replaced with createContext()", getClass().getSimpleName(), getIdentifier());
return createContext();
}
@@ -285,6 +290,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
@Deprecated
@Override
public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException {
+ deprecationLogger.warn("{}[id={}] createSSLContext() should be replaced with createContext()", getClass().getSimpleName(), getIdentifier());
return createContext();
}
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index a0c138216d..718be12d4c 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -381,6 +381,11 @@
provided
+
+ org.apache.nifi
+ nifi-deprecation-log
+ 1.18.0-SNAPSHOT
+
org.apache.commons
commons-dbcp2