NIFI-9701 - Corrected No Tracking strategy to create one flow file when using a Record Writer

- Corrected No Tracking strategy Record Writer handling for ListSFTP
- Updated temporary test files to have last modified time of epoch to avoid intermittent issue with Minimum Age filtering
- Refactored MockCacheService to separate reusable class

This closes #5885

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nathan Gough 2022-03-21 10:32:16 -04:00 committed by exceptionfactory
parent 105890ef37
commit 2fbe10b4bc
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 183 additions and 80 deletions

View File

@ -576,17 +576,17 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
entitiesForTimestamp.add(entity);
}
if (orderedEntries.size() > 0) {
for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
List<T> entities = timestampEntities.getValue();
for (T entity : entities) {
// Create the FlowFile for this path.
final Map<String, String> attributes = createAttributes(entity, context);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
if (writerSet) {
try {
createRecordsForEntities(context, session, orderedEntries);
} catch (final IOException | SchemaNotFoundException e) {
getLogger().error("Failed to write listing to FlowFile", e);
context.yield();
return;
}
} else {
createFlowFilesForEntities(context, session, orderedEntries);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
private Map storage;
public MockCacheService() {
storage = new HashMap<>();
}
@Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
return false;
}
@Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
return storage.containsKey(key);
}
@Override
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
storage.put(key, value);
}
@Override
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
return false;
}
@Override
public long removeByPattern(String regex) throws IOException {
return 0;
}
}

View File

@ -17,10 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
@ -31,7 +28,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -271,51 +267,4 @@ public class TestDeduplicateRecord {
}
}
private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
private Map storage;
public MockCacheService() {
storage = new HashMap<>();
}
@Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
return false;
}
@Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
return storage.containsKey(key);
}
@Override
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
storage.put(key, value);
}
@Override
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
return false;
}
@Override
public long removeByPattern(String regex) throws IOException {
return 0;
}
}
}

View File

@ -21,17 +21,24 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -50,15 +57,14 @@ public class TestListSFTP {
private SSHTestServer sshServer;
private String tempFileName;
private List<File> testFileNames;
@Before
public void setUp() throws Exception {
sshServer = new SSHTestServer();
sshServer.startServer();
writeTempFile();
testFileNames = new ArrayList<File>();
writeTempFile(3);
runner = TestRunners.newTestRunner(ListSFTP.class);
runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
@ -66,7 +72,6 @@ public class TestListSFTP {
runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
assertVerificationSuccess();
}
@ -74,13 +79,13 @@ public class TestListSFTP {
@After
public void tearDown() throws Exception {
sshServer.stopServer();
Files.deleteIfExists(Paths.get(sshServer.getVirtualFileSystemPath()));
}
@Test
public void testRunFileFound() {
runner.run();
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
public void testRunFileFound() throws InterruptedException {
runner.run(1);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@ -93,14 +98,81 @@ public class TestListSFTP {
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
}
@Test
public void testRunWithRecordWriter() throws InitializationException, InterruptedException {
RecordSetWriterFactory recordWriter = getCsvRecordWriter();
runner.addControllerService("csv-record-writer", recordWriter);
runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
runner.enableControllerService(recordWriter);
runner.assertValid(recordWriter);
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
}
@Test
public void testRunWithRecordWriterNoTracking() throws InitializationException, InterruptedException {
RecordSetWriterFactory recordWriter = getCsvRecordWriter();
runner.addControllerService("csv-record-writer", recordWriter);
runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
runner.enableControllerService(recordWriter);
runner.assertValid(recordWriter);
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 2);
}
@Test
public void testRunWithRecordWriterByTimestamps() throws InitializationException, InterruptedException {
RecordSetWriterFactory recordWriter = getCsvRecordWriter();
runner.addControllerService("csv-record-writer", recordWriter);
runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS);
runner.enableControllerService(recordWriter);
runner.assertValid(recordWriter);
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
}
@Test
public void testRunWithRecordWriterByEntities() throws InitializationException, InterruptedException {
RecordSetWriterFactory recordWriter = getCsvRecordWriter();
runner.addControllerService("csv-record-writer", recordWriter);
runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
runner.enableControllerService(recordWriter);
DistributedMapCacheClient dmc = new MockCacheService<>();
runner.addControllerService("dmc", dmc);
runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
runner.enableControllerService(dmc);
runner.assertValid(dmc);
runner.assertValid(recordWriter);
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
}
@Test
public void testFilesWithRestart() throws InitializationException, InterruptedException {
RecordSetWriterFactory recordWriter = getCsvRecordWriter();
runner.addControllerService("csv-record-writer", recordWriter);
runner.setProperty(AbstractListProcessor.RECORD_WRITER, "csv-record-writer");
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
runner.enableControllerService(recordWriter);
DistributedMapCacheClient dmc = new MockCacheService<>();
runner.addControllerService("dmc", dmc);
runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "dmc");
runner.enableControllerService(dmc);
runner.assertValid();
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
}
@Test
public void testRunFileNotFoundMinSizeFiltered() {
runner.setProperty(ListFile.MIN_SIZE, "1KB");
runner.run();
runner.run(2);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}
@ -113,13 +185,21 @@ public class TestListSFTP {
assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
}
private void writeTempFile() {
final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
try {
Files.write(file.toPath(), FILE_CONTENTS);
tempFileName = file.getName();
} catch (final IOException e) {
throw new UncheckedIOException(e);
private void writeTempFile(final int count) {
for (int i = 0; i < count; i++) {
final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
try {
Files.write(file.toPath(), FILE_CONTENTS);
file.setLastModified(0);
testFileNames.add(file);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
assert(new File(sshServer.getVirtualFileSystemPath()).listFiles().length == count);
}
private RecordSetWriterFactory getCsvRecordWriter() {
return new MockRecordWriter("name, age");
}
}