NIFI-10471 Documented and applied standard deprecation logging

- Updated Admin Guide with section on Logging Configuration and Deprecation Logging
- Updated Developer Guide with section on Deprecating Components and Features
- Added Deprecation Logging when adding components marked with DeprecationNotice to the Flow Configuration
- Added Deprecation Logging on deprecated methods in standard Controller Service implementations
- Removed integration tests for deprecated PersistentProvenanceRepository
- Updated logging and added documentation on minor upgrade steps
- Updated logging for HDFSNarProvider and RocksDB Repository

This closes #6390
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2022-09-09 16:37:53 -05:00 committed by Paul Grey
parent e26aa233d7
commit 04c0df1723
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
23 changed files with 309 additions and 2256 deletions

View File

@ -182,6 +182,49 @@ Antivirus software can take a long time to scan large directories and the numero
* `provenance_repository`
* `state`
[[logging_configuration]]
== Logging Configuration
NiFi uses link:https://logback.qos.ch/[logback^] as the runtime logging implementation. The `conf` directory contains a
standard `logback.xml` configuration with default appender and level settings. The
link:https://logback.qos.ch/manual/index.html[logback manual] provides a complete reference of available options.
=== Standard Log Files
The standard logback configuration includes the following appender definitions and associated log files:
[options="header"]
|=========================
| File | Description
| `nifi-app.log` | Application log containing framework and component messages
| `nifi-bootstrap.log` | Bootstrap log containing startup and shutdown messages
| `nifi-deprecation.log` | Deprecation log containing warnings for deprecated components and features
| `nifi-request.log` | HTTP request log containing user interface and REST API access messages
| `nifi-user.log` | User log containing authentication and authorization messages
|=========================
=== Deprecation Logging
The `nifi-deprecation.log` contains warning messages describing components and features that will be removed in
subsequent versions. Deprecation warnings should be evaluated and addressed to avoid breaking changes when upgrading to
a new major version. Resolving deprecation warnings involves upgrading to new components, changing component property
settings, or refactoring custom component classes.
Deprecation logging provides a method for checking compatibility before upgrading from one major release version to
another. Upgrading to the latest minor release version will provide the most accurate set of deprecation warnings.
It is important to note that deprecation logging applies to both components and features. Logging for deprecated
features requires a runtime reference to the property or method impacted. Disabled components with deprecated properties
or methods will not generate deprecation logs. For this reason, it is important to exercise all configured components
long enough to exercise standard flow behavior.
Deprecation logging can generate repeated messages depending on component configuration and usage patterns. Disabling
deprecation logging for a specific component class can be configured by adding a `logger` element to `logback.xml`.
The `name` attribute must start with `deprecation`, followed by the component class. Setting the `level` attribute to
`OFF` disables deprecation logging for the component specified.
[source, xml]
----
<logger name="deprecation.org.apache.nifi.processors.ListenLegacyProtocol" level="OFF" />
----
[[security_configuration]]
== Security Configuration

View File

@ -2637,19 +2637,83 @@ logged to help avoid this bad practice.
[[deprecation]]
== Deprecating a Component
== Deprecating Components and Features
Deprecating features is an important part of the software development lifecycle, providing an upgrade path for
developers and users to follow.
Apache NiFi follows the link:https://semver.org[Semantic Versioning Specification 2.0.0^] for features identified as
part of the public version contract according to
link:https://cwiki.apache.org/confluence/display/NIFI/Version+Scheme+and+API+Compatibility[Version Scheme]
documentation.
Components and features that fit under the public version contract require deprecation marking prior to removal. This
approach allows developers to implement changes as part of minor version upgrades, in preparation for future removal
of features in a subsequent major version.
=== Component Deprecation
Sometimes it may be desirable to deprecate a component. Whenever this occurs the developer may use the
`@DeprecationNotice` annotation to indicate that a component has been deprecated, allowing the developer
to describe a reason for the deprecation and suggest alternative components. An example of how to do this can
to describe a `reason` for the deprecation and suggest alternative components. An example of how to do this can
be found below:
[source, java]
----
@DeprecationNotice(alternatives = {ListenSyslog.class}, classNames = {"org.apache.nifi.processors.standard.ListenRELP"}, reason = "Technology has been superseded", )
public class ListenOldProtocol extends AbstractProcessor {
@DeprecationNotice(
reason = "Legacy Protocol has been superseded",
alternatives = {ListenSyslog.class},
classNames = {"org.apache.nifi.processors.standard.ListenRELP"}
)
public class ListenLegacyProtocol extends AbstractProcessor {}
----
The `alternatives` property can be used to define an array of recommended replacement Components, while `classNames`
can be used to represent similar content through an array of class name strings.
Adding the `@DeprecationNotice` annotation renders a warning message in generated documentation and also logs the
following warning when the Flow Configuration includes the component:
----
Added Deprecated Component ListenLegacyProtocol[id=929a52c7-1e3e-423e-b303-6ca2ef657617] See alternatives [ListenSyslog,ListenRELP]
----
=== Feature Deprecation
Deprecating features includes changing component configuration strategies, introducing new repository classes, and
refactoring a Controller Service interface. Removing component properties can create invalid Flow Configurations after
upgrading, and removing public methods from a Controller Service interface can break components compiled against
previous versions. For these reasons, introducing new properties and methods must include a deprecation strategy that
supports compatibility when upgrading from one minor version to another.
Annotating methods and properties with the Java `@Deprecated` annotation provides a warning to software developers, but
does not provide any information to users. Adding selective deprecation logging provides a method for users to determine
whether the current Flow Configuration uses deprecated properties or features.
=== Deprecation Logging
The `nifi-deprecation-log` module provides an abstraction for logging deprecation warnings. The `DeprecationLogger`
interface has a `warn()` method that accepts a message string with a variable number of arguments. The standard
implementation delegates to SLF4J, supporting standard features for providing and resolving placeholder arguments. The
standard `DeprecationLogger` implementation also logs a `DeprecationException` with the referenced class, supporting
tracing to the usage location.
Instantiating a `DeprecationLogger` follows the same conventions as instantiating an SLF4J `Logger`:
[source, java]
----
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(Service.class);
----
Logging warnings for deprecated extension component properties and methods should include the simple class name and the
component identifier for tracing. Warning messages should also reference recommended alternatives.
[source, java]
----
deprecationLogger.warn(
"{}[id={}] createServiceContext() should be replaced with createContext()",
getClass().getSimpleName(),
getIdentifier()
);
----
As you can see, the alternatives can be used to define and array of alternative Components, while classNames can be
used to represent the similar content through an array of strings.
[[build]]
== Build Options

View File

@ -214,8 +214,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
RecordReader reader = readerFactory.createRecordReader(input, is, getLogger());
RecordSchema schema = writerFactory.getSchema(input.getAttributes(), reader.getSchema());
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os);
RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound) : null;
RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, os, input);
RecordSetWriter notFoundWriter = splitOutput ? writerFactory.createWriter(getLogger(), schema, osNotFound, input) : null;
Record record;
Relationship targetRelationship = REL_NOT_FOUND;
writer.beginRecordSet();

View File

@ -43,6 +43,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-components</artifactId>

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.flow;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@ -24,6 +25,7 @@ import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.util.IdentityMappingUtil;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -52,6 +54,8 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@ -66,6 +70,8 @@ import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.PublicPort;
@ -73,6 +79,7 @@ import org.apache.nifi.remote.StandardPublicPort;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@ -82,6 +89,7 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -338,8 +346,10 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
}
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getProcessor().getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, procNode.getProcessor());
final Processor processor = procNode.getProcessor();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
logDeprecationNotice(processor);
} catch (final Exception e) {
if (registerLogObserver) {
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
@ -385,17 +395,19 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
LogRepositoryFactory.getRepository(taskNode.getIdentifier()).setLogger(taskNode.getLogger());
if (firstTimeAdded) {
final Class<?> taskClass = taskNode.getReportingTask().getClass();
final String identifier = taskNode.getReportingTask().getIdentifier();
final ReportingTask reportingTask = taskNode.getReportingTask();
final Class<? extends ConfigurableComponent> taskClass = reportingTask.getClass();
final String identifier = reportingTask.getIdentifier();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, taskNode.getReportingTask());
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, reportingTask);
logDeprecationNotice(reportingTask);
if (flowController.isInitialized()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, reportingTask, taskNode.getConfigurationContext());
}
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + taskNode.getReportingTask(), e);
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + reportingTask, e);
}
}
@ -440,17 +452,19 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
LogRepositoryFactory.getRepository(parameterProviderNode.getIdentifier()).setLogger(parameterProviderNode.getLogger());
if (firstTimeAdded) {
final Class<?> taskClass = parameterProviderNode.getParameterProvider().getClass();
final String identifier = parameterProviderNode.getParameterProvider().getIdentifier();
final ParameterProvider parameterProvider = parameterProviderNode.getParameterProvider();
final Class<? extends ConfigurableComponent> parameterProviderClass = parameterProvider.getClass();
final String identifier = parameterProvider.getIdentifier();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), taskClass, identifier)) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, parameterProviderNode.getParameterProvider());
try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), parameterProviderClass, identifier)) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, parameterProvider);
logDeprecationNotice(parameterProvider);
if (flowController.isInitialized()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, parameterProviderNode.getParameterProvider());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, parameterProvider);
}
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + parameterProviderNode.getParameterProvider(), e);
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + parameterProvider, e);
}
}
@ -561,6 +575,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
final ControllerService serviceImpl = serviceNode.getControllerServiceImplementation();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, serviceImpl.getClass(), serviceImpl.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, serviceImpl);
logDeprecationNotice(serviceImpl);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + serviceImpl, e);
}
@ -586,5 +601,26 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
return flowController;
}
private void logDeprecationNotice(final ConfigurableComponent component) {
final Class<? extends ConfigurableComponent> componentClass = component.getClass();
final DeprecationNotice deprecationNotice = componentClass.getAnnotation(DeprecationNotice.class);
if (deprecationNotice != null) {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(componentClass);
final List<String> alternatives = new ArrayList<>();
for (final Class<? extends ConfigurableComponent> alternativeClass : deprecationNotice.alternatives()) {
alternatives.add(alternativeClass.getSimpleName());
}
for (final String className : deprecationNotice.classNames()) {
alternatives.add(className);
}
deprecationLogger.warn("Added Deprecated Component {}[id={}] See alternatives {}",
componentClass.getSimpleName(),
component.getIdentifier(),
alternatives
);
}
}
}

View File

@ -27,6 +27,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>

View File

@ -17,6 +17,8 @@
package org.apache.nifi.nar.hadoop;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
import org.apache.nifi.flow.resource.NarProviderAdapterInitializationContext;
import org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider;
@ -31,9 +33,16 @@ import java.util.stream.Collectors;
@Deprecated
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class HDFSNarProvider extends HDFSExternalResourceProvider implements NarProvider {
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(HDFSNarProvider.class);
private static final String IMPLEMENTATION_PROPERTY = "nifi.nar.library.provider.hdfs.implementation";
@Override
public void initialize(final NarProviderInitializationContext context) {
deprecationLogger.warn("{} should be replaced with HDFSExternalResourceProvider for [{}] in nifi.properties",
getClass().getSimpleName(),
IMPLEMENTATION_PROPERTY
);
initialize(new NarProviderAdapterInitializationContext(context));
}

View File

@ -41,6 +41,8 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -227,6 +229,8 @@ public class ListHDFS extends AbstractHadoopProcessor {
.description("All FlowFiles are transferred to this relationship")
.build();
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(ListHDFS.class);
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L;
@ -277,6 +281,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
if (context.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet()) {
deprecationLogger.warn("{}[id={}] [{}] Property is not used",
getClass().getSimpleName(),
getIdentifier(),
DISTRIBUTED_CACHE_SERVICE.getDisplayName()
);
}
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);

View File

@ -30,6 +30,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-data-provenance-utils</artifactId>

View File

@ -31,6 +31,8 @@ import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.expiration.ExpirationAction;
@ -146,6 +148,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
private static final float ROLLOVER_HIGH_WATER = 0.99f;
private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(PersistentProvenanceRepository.class);
private final long maxPartitionMillis;
private final long maxPartitionBytes;
@ -226,6 +229,11 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
public PersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException {
deprecationLogger.warn("{} should be replaced with WriteAheadProvenanceRepository for [{}] in nifi.properties",
getClass().getSimpleName(),
NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS
);
if (configuration.getStorageDirectories().isEmpty()) {
throw new IllegalArgumentException("Must specify at least one storage directory");
}

View File

@ -1,170 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.NopTocWriter;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.stream.io.NullOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
@SuppressWarnings("deprecation")
public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter {
private final AtomicLong idGenerator = new AtomicLong(0L);
@BeforeEach
public void resetIds() {
idGenerator.set(0L);
}
@Test
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
public void testWritePerformance() throws IOException {
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
// on performance of the repository.
final ProvenanceEventRecord event = createEvent();
final TocWriter tocWriter = new NopTocWriter();
final int numEvents = 10_000_000;
final long startNanos = System.nanoTime();
try (final OutputStream nullOut = new NullOutputStream();
final RecordWriter writer = new StandardRecordWriter(nullOut, "devnull", idGenerator, tocWriter, false, 100000)) {
writer.writeHeader(0L);
for (int i = 0; i < numEvents; i++) {
writer.writeRecord(event);
}
}
final long nanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
System.out.println("Took " + millis + " millis to write " + numEvents + " events");
}
@Test
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
public void testReadPerformance() throws IOException {
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
// on performance of the repository.
final ProvenanceEventRecord event = createEvent();
final byte[] header;
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(headerOut)) {
out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(9);
header = headerOut.toByteArray();
}
final byte[] serializedRecord;
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
final StandardRecordWriter writer = new StandardRecordWriter(headerOut, "devnull", idGenerator, null, false, 0)) {
writer.writeHeader(1L);
headerOut.reset();
writer.writeRecord(event);
writer.flush();
serializedRecord = headerOut.toByteArray();
}
final int numEvents = 10_000_000;
final long startNanos = System.nanoTime();
try (final InputStream in = new LoopingInputStream(header, serializedRecord);
final RecordReader reader = new StandardRecordReader(in, "filename", null, 100000)) {
for (int i = 0; i < numEvents; i++) {
reader.nextRecord();
}
}
final long nanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
System.out.println("Took " + millis + " millis to read " + numEvents + " events");
}
@Test
public void testWriteUtfLargerThan64k() throws IOException {
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", "1.txt");
attributes.put("uuid", UUID.randomUUID().toString());
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final String seventyK = StringUtils.repeat("X", 70000);
assertTrue(seventyK.length() > 65535);
assertTrue(seventyK.getBytes(StandardCharsets.UTF_8).length > 65535);
builder.setDetails(seventyK);
final ProvenanceEventRecord record = builder.build();
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(headerOut)) {
out.writeUTF(PersistentProvenanceRepository.class.getName());
out.writeInt(9);
}
try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream();
final StandardRecordWriter writer = new StandardRecordWriter(recordOut, "devnull", idGenerator, null, false, 0)) {
writer.writeHeader(1L);
recordOut.reset();
writer.writeRecord(record);
}
}
@Override
protected RecordWriter createWriter(File file, TocWriter tocWriter, boolean compressed, int uncompressedBlockSize) throws IOException {
return new StandardRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize);
}
@Override
protected RecordReader createReader(InputStream in, String journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException {
return new StandardRecordReader(in, journalFilename, tocReader, maxAttributeSize);
}
}

View File

@ -43,6 +43,10 @@
<version>1.18.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>

View File

@ -21,6 +21,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@ -78,6 +80,7 @@ import java.util.stream.Collectors;
public class RocksDBFlowFileRepository implements FlowFileRepository {
private static final Logger logger = LoggerFactory.getLogger(RocksDBFlowFileRepository.class);
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(RocksDBFlowFileRepository.class);
private static final String FLOWFILE_PROPERTY_PREFIX = "nifi.flowfile.repository.";
private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = FLOWFILE_PROPERTY_PREFIX + "directory";
@ -286,7 +289,10 @@ public class RocksDBFlowFileRepository implements FlowFileRepository {
}
public RocksDBFlowFileRepository(final NiFiProperties niFiProperties) {
logger.warn("*** " + RocksDBFlowFileRepository.class.getSimpleName() + " is deprecated and will be removed in future versions of Apache NiFi. ***");
deprecationLogger.warn("{} should be replaced with WriteAheadFlowFileRepository for [{}] in nifi.properties",
getClass().getSimpleName(),
NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION
);
deserializationThreads = RocksDbProperty.DESERIALIZATION_THREADS.getIntValue(niFiProperties);
deserializationBufferSize = RocksDbProperty.DESERIALIZATION_BUFFER_SIZE.getIntValue(niFiProperties);

View File

@ -34,6 +34,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-put-pattern</artifactId>

View File

@ -95,6 +95,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -630,6 +632,8 @@ public class InvokeHTTP extends AbstractProcessor {
private static final String MULTIPLE_HEADER_DELIMITER = ", ";
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(InvokeHTTP.class);
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
private volatile Pattern requestHeaderAttributesPattern = null;
@ -704,6 +708,15 @@ public class InvokeHTTP extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(3);
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
if (proxyHostSet) {
deprecationLogger.warn("{}[id={}] [{}] Property should be replaced with [{}] Property",
getClass().getSimpleName(),
getIdentifier(),
PROXY_HOST.getDisplayName(),
PROXY_CONFIGURATION_SERVICE.getDisplayName()
);
}
final boolean proxyPortSet = validationContext.getProperty(PROXY_PORT).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {

View File

@ -32,5 +32,9 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -50,7 +52,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
* </p>
*/
public interface RecordSetWriterFactory extends ControllerService {
/**
* <p>
* Returns the Schema that will be used for writing Records. The given variables are
@ -82,6 +83,15 @@ public interface RecordSetWriterFactory extends ControllerService {
*/
@Deprecated
default RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
final String deprecatedMethod = "createWriter(ComponentLog, RecordSchema, OutputStream)";
final String replacementMethod = "createWriter(ComponentLog, RecordSchema, OutputStream, FlowFile)";
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
getClass().getSimpleName(),
getIdentifier(),
deprecatedMethod,
replacementMethod
);
return createWriter(logger, schema, out, Collections.emptyMap());
}

View File

@ -31,5 +31,9 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -17,6 +17,8 @@
package org.apache.nifi.schemaregistry.services;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
@ -43,6 +45,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
default String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
final String deprecatedMethod = "retrieveSchemaText(schemaName)";
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
getClass().getSimpleName(),
getIdentifier(),
deprecatedMethod,
replacementMethod
);
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with name '" + schemaName + "'");
@ -63,6 +75,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
default String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
getClass().getSimpleName(),
getIdentifier(),
deprecatedMethod,
replacementMethod
);
final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
if (recordSchema == null) {
throw new SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'");
@ -82,6 +104,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given name
*/
default RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
final String deprecatedMethod = "retrieveSchemaText(schemaName)";
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
getClass().getSimpleName(),
getIdentifier(),
deprecatedMethod,
replacementMethod
);
return retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
}
@ -104,6 +136,16 @@ public interface SchemaRegistry extends ControllerService {
* @throws SchemaNotFoundException if unable to find the schema with the given id and version
*/
default RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException {
final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(getClass());
final String deprecatedMethod = "retrieveSchemaText(schemaId, version)";
final String replacementMethod = "retrieveSchema(SchemaIdentifier)";
deprecationLogger.warn("{}[id={}] {} should be replaced with {}",
getClass().getSimpleName(),
getIdentifier(),
deprecatedMethod,
replacementMethod
);
return retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
}

View File

@ -35,6 +35,10 @@
<artifactId>nifi-security-utils</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -30,6 +30,8 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@ -128,6 +130,8 @@ public class StandardSSLContextService extends AbstractControllerService impleme
.sensitive(false)
.build();
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(StandardSSLContextService.class);
private static final List<PropertyDescriptor> properties;
protected ConfigurationContext configContext;
private boolean isValidated;
@ -269,6 +273,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
@Deprecated
@Override
public SSLContext createSSLContext(final org.apache.nifi.security.util.ClientAuth clientAuth) throws ProcessException {
deprecationLogger.warn("{}[id={}] createSSLContext() should be replaced with createContext()", getClass().getSimpleName(), getIdentifier());
return createContext();
}
@ -285,6 +290,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
@Deprecated
@Override
public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException {
deprecationLogger.warn("{}[id={}] createSSLContext() should be replaced with createContext()", getClass().getSimpleName(), getIdentifier());
return createContext();
}

View File

@ -381,6 +381,11 @@
<scope>provided</scope>
</dependency>
<!-- Managed dependency versions applicable to all modules -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-deprecation-log</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>