NIFI-9610 Refactored nifi-processor-utils to separate modules

- Added nifi-bin-manager
- Added nifi-event-listen
- Added nifi-event-put
- Added nifi-listed-entity
- Added nifi-put-pattern
- Removed nifi-processor-utils

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5729.
This commit is contained in:
exceptionfactory 2022-01-31 15:06:13 -06:00 committed by Nathan Gough
parent 7afb34ab39
commit dc7d9510cd
65 changed files with 301 additions and 654 deletions

View File

@ -35,7 +35,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-listed-entity</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
@ -49,6 +49,14 @@
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>

View File

@ -165,6 +165,11 @@
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version> <version>${jackson.version}</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
@ -220,7 +225,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-listed-entity</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -32,7 +32,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-event-listen</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -0,0 +1,36 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-bin-manager</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,61 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-event-listen</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-socket-ssl</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,46 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-event-put</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -20,16 +20,8 @@
<artifactId>nifi-extension-utils</artifactId> <artifactId>nifi-extension-utils</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-listed-entity</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<description>
This nifi-processor-utils module is designed to capture common patterns
and utilities that can be leveraged by other processors or components to
help promote reuse. These patterns may become framework level features
or may simply be made available through this utility. It is ok for this
module to have dependencies but care should be taken when adding dependencies
as this increases the cost of utilizing this module in various nars.
</description>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -42,24 +34,11 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-socket-ssl</artifactId> <artifactId>nifi-record-serialization-service-api</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- Other modules using nifi-processor-utils are expected to have this API available, typically through a NAR dependency -->
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-record</artifactId>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
@ -71,12 +50,6 @@
<artifactId>nifi-distributed-cache-client-service-api</artifactId> <artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<!-- Dependency marked as provided, not test, because ListProcessorTestWatcher uses TestWatcher --> <!-- Dependency marked as provided, not test, because ListProcessorTestWatcher uses TestWatcher -->
<groupId>junit</groupId> <groupId>junit</groupId>
@ -85,44 +58,13 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId> <artifactId>nifi-mock</artifactId>
<scope>compile</scope> <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId> <artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<profiles>
<profile>
<!-- This profile, activating when compiling on Java versions above 1.8, provides configuration changes to
allow NiFi to be compiled on those JDKs. -->
<id>jigsaw</id>
<activation>
<jdk>(1.8,)</jdk>
</activation>
<dependencies>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project> </project>

View File

@ -49,7 +49,6 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.glassfish.jersey.internal.guava.Predicates; import org.glassfish.jersey.internal.guava.Predicates;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -208,7 +207,7 @@ public class TestAbstractListProcessor {
runner.run(); runner.run();
// Verify the local persistence file is removed // Verify the local persistence file is removed
Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists()); assertTrue("Failed to remove persistence file", !persistenceFile.exists());
// Verify the state manager now maintains the associated state // Verify the state manager now maintains the associated state
final Map<String, String> expectedState = new HashMap<>(); final Map<String, String> expectedState = new HashMap<>();

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.dispatcher;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import java.nio.ByteBuffer;
/**
* Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the selector key.
* */
public class SocketChannelAttachment {
private final ByteBuffer byteBuffer;
private final SSLSocketChannel sslSocketChannel;
public SocketChannelAttachment(final ByteBuffer byteBuffer, final SSLSocketChannel sslSocketChannel) {
this.byteBuffer = byteBuffer;
this.sslSocketChannel = sslSocketChannel;
}
public ByteBuffer getByteBuffer() {
return byteBuffer;
}
public SSLSocketChannel getSslSocketChannel() {
return sslSocketChannel;
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.listen.handler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventQueue;
import java.nio.channels.SelectionKey;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
/**
* Base class for all channel handlers.
*/
public abstract class ChannelHandler<E extends Event, D extends ChannelDispatcher> implements Runnable {
protected final SelectionKey key;
protected final D dispatcher;
protected final Charset charset;
protected final EventFactory<E> eventFactory;
protected final EventQueue<E> events;
protected final ComponentLog logger;
public ChannelHandler(final SelectionKey key,
final D dispatcher,
final Charset charset,
final EventFactory<E> eventFactory,
final BlockingQueue<E> events,
final ComponentLog logger) {
this.key = key;
this.dispatcher = dispatcher;
this.charset = charset;
this.eventFactory = eventFactory;
this.logger = logger;
this.events = new EventQueue<E>(events, logger);
}
}

View File

@ -1,471 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.list;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestWatcher;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import org.apache.nifi.processor.util.list.TestAbstractListProcessor.ConcreteListProcessor;
import org.apache.nifi.processor.util.list.TestAbstractListProcessor.DistributedCache;
import static org.junit.Assert.assertEquals;
public class ITAbstractListProcessor {
/**
* @return current timestamp in milliseconds, but truncated at specified
* target precision (e.g. SECONDS or MINUTES).
*/
private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) {
final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision);
}
private static long getSleepMillis(final TimeUnit targetPrecision) {
return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2;
}
private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS);
private ConcreteListProcessor proc;
private TestRunner runner;
@Rule
public TestWatcher dumpState = new ListProcessorTestWatcher(
() -> {
try {
return runner.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> proc.getEntityList(),
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
);
@Before
public void setup() {
proc = new ConcreteListProcessor();
runner = TestRunners.newTestRunner(proc);
}
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
/**
* <p>
* Ensures that files are listed when those are old enough:
* <li>Files with last modified timestamp those are old enough to determine
* that those are completely written and no further files are expected to be
* added with the same timestamp.</li>
* <li>This behavior is expected when a processor is scheduled less
* frequently, such as hourly or daily.</li>
* </p>
*/
@Test
public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS);
// These entries have existed before the processor runs at the first time.
proc.addEntity("name", "id", oldTimestamp);
proc.addEntity("name", "id2", oldTimestamp);
// First run, the above listed entries should be emitted since it has existed.
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Run again without introducing any new entries
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
}
private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException {
runner.run();
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
setTargetSystemTimestampPrecision(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
// First run, the above listed entries would be skipped to avoid write synchronization issues
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(getSleepMillis(targetPrecision));
// Run again without introducing any new entries
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
}
/**
* <p>
* Ensures that newly created files should wait to confirm there is no more
* files created with the same timestamp:
* <li>If files have the latest modified timestamp at an iteration, then
* those should be postponed to be listed</li>
* <li>If those files still are the latest files at the next iteration, then
* those should be listed</li>
* </p>
*/
@Test
public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception {
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
}
/**
* Same as
* {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()}
* but simulates that the target filesystem only provide timestamp precision
* in Seconds.
*/
@Test
public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception {
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
}
/**
* Same as
* {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()}
* but simulates that the target filesystem only provide timestamp precision
* in Minutes.
*/
@Test
public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception {
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
}
private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException {
final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
setTargetSystemTimestampPrecision(targetPrecision);
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
// First run, the above listed entries would be skipped to avoid write synchronization issues
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(getSleepMillis(targetPrecision));
// Running again, our two previously seen files are now cleared to be released
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
// Verify no new old files show up
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// An entry that is older than already processed entry should not be listed.
proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1));
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// If an entry whose timestamp is the same with the last processed timestamp should not be listed.
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Now a new file beyond the current time enters
proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1));
// It should show up
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
final List<ConfigVerificationResult> results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
}
private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
switch (targetPrecision) {
case MINUTES:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES);
break;
case SECONDS:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS);
break;
case MILLISECONDS:
runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS);
break;
}
}
@Test
public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
}
@Test
public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
}
@Test
public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
}
@Test
public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
// Emulate having state but not having had the processor run such as in a restart
final Map<String, String> preexistingState = new HashMap<>();
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2");
runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
// run for the first time
runner.run();
// First run, the above listed entries would be skipped
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
Thread.sleep(DEFAULT_SLEEP_MILLIS);
// Running again, these files should be eligible for transfer and again skipped
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Verify no new old files show up
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id3", initialTimestamp - 1);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Now a new file beyond the current time enters
proc.addEntity("name", "id2", initialTimestamp + 1);
// It should now show up
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
}
@Test
public void testStateStoredInClusterStateManagement() throws Exception {
final DistributedCache cache = new DistributedCache();
runner.addControllerService("cache", cache);
runner.enableControllerService(cache);
runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
final long initialTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialTimestamp);
runner.run();
final Map<String, String> expectedState = new HashMap<>();
// Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
// Ensure only timestamp is migrated
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
}
@Test
public void testResumeListingAfterClearingState() throws Exception {
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
final long initialEventTimestamp = System.currentTimeMillis();
proc.addEntity("name", "id", initialEventTimestamp);
proc.addEntity("name", "id2", initialEventTimestamp);
// Add entities but these should not be transferred as they are the latest values
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
// after providing a pause in listings, the files should now transfer
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
// Verify entities are not transferred again for the given state
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Clear state for this processor, eradicating timestamp
runner.getStateManager().clear(Scope.CLUSTER);
Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
// Ensure the original files are now transferred again.
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
}
@Test
public void testResumeListingAfterBecamePrimary() throws Exception {
final long initialTimestamp = System.currentTimeMillis();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
// Add entities but these should not be transferred as they are the latest values
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// after providing a pause in listings, the files should now transfer
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
// Emulate reelection process
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
// Now a new file enters
proc.addEntity("name", "id3", initialTimestamp + 1);;
// First run skips the execution because determined timestamp is the same as last listing
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
// Now the cluster state has been read, all set to perform next listing
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
}
@Test
public void testOnlyNewStateStored() throws Exception {
runner.run();
final long initialTimestamp = System.currentTimeMillis();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
proc.addEntity("name", "id", initialTimestamp);
proc.addEntity("name", "id2", initialTimestamp);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
runner.clearTransferState();
Thread.sleep(DEFAULT_SLEEP_MILLIS);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
runner.clearTransferState();
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(2, stateMap.getVersion());
final Map<String, String> map = stateMap.toMap();
// Ensure timestamp and identifiers are migrated
assertEquals(4, map.size());
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1"));
proc.addEntity("new name", "new id", initialTimestamp + 1);
runner.run();
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
runner.clearTransferState();
StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
assertEquals(3, updatedStateMap.getVersion());
assertEquals(3, updatedStateMap.toMap().size());
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
// Processed timestamp is now caught up
assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
}
}

View File

@ -0,0 +1,41 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-extension-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-put-pattern</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -27,17 +27,20 @@
</description> </description>
<modules> <modules>
<module>nifi-record-utils</module> <module>nifi-bin-manager</module>
<module>nifi-hadoop-utils</module>
<module>nifi-processor-utils</module>
<module>nifi-reporting-utils</module>
<module>nifi-syslog-utils</module>
<module>nifi-database-utils</module> <module>nifi-database-utils</module>
<module>nifi-database-test-utils</module> <module>nifi-database-test-utils</module>
<module>nifi-service-utils</module> <module>nifi-event-listen</module>
<module>nifi-prometheus-utils</module> <module>nifi-event-put</module>
<module>nifi-kerberos-test-utils</module>
<module>nifi-event-transport</module> <module>nifi-event-transport</module>
<module>nifi-hadoop-utils</module>
<module>nifi-kerberos-test-utils</module>
<module>nifi-listed-entity</module>
<module>nifi-prometheus-utils</module>
<module>nifi-put-pattern</module>
<module>nifi-record-utils</module>
<module>nifi-reporting-utils</module>
<module>nifi-service-utils</module>
<module>nifi-syslog-utils</module>
</modules> </modules>
</project> </project>

View File

@ -56,7 +56,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-listed-entity</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
@ -121,6 +121,11 @@
<artifactId>json</artifactId> <artifactId>json</artifactId>
<version>1.8</version> <version>1.8</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId> <artifactId>jcl-over-slf4j</artifactId>

View File

@ -38,7 +38,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-put-pattern</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
@ -167,6 +172,14 @@
<artifactId>nifi-hadoop-record-utils</artifactId> <artifactId>nifi-hadoop-record-utils</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.github.stephenc.findbugs</groupId> <groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId> <artifactId>findbugs-annotations</artifactId>

View File

@ -33,7 +33,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-put-pattern</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -38,7 +38,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-put-pattern</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
@ -175,6 +180,14 @@
<artifactId>nifi-hadoop-record-utils</artifactId> <artifactId>nifi-hadoop-record-utils</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.github.stephenc.findbugs</groupId> <groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId> <artifactId>findbugs-annotations</artifactId>

View File

@ -41,9 +41,29 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-put</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId> <artifactId>nifi-ssl-context-service</artifactId>

View File

@ -36,7 +36,27 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-put-pattern</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-bin-manager</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-listed-entity</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-put</artifactId>
<version>1.16.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-listen</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -41,7 +41,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-bin-manager</artifactId>
<version>1.16.0-SNAPSHOT</version> <version>1.16.0-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>