diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java index c7e9c22197..c64ea1cd27 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/exception/FlowFileAccessException.java @@ -18,7 +18,7 @@ package org.apache.nifi.processor.exception; /** * Indicates an issue occurred while accessing the content of a FlowFile, such - * as an IOException. + * as an IOException, or obtaining a reference to the FlowFile * */ public class FlowFileAccessException extends RuntimeException { diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java index 0dad62ceee..002ecd2955 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/HashMapSnapshot.java @@ -17,6 +17,12 @@ package org.apache.nifi.wali; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -37,12 +43,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.SerDe; -import org.wali.SerDeFactory; -import org.wali.UpdateType; - public class HashMapSnapshot implements WriteAheadSnapshot, RecordLookup { private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class); private static final int ENCODING_VERSION = 1; @@ -216,10 +216,14 @@ public class HashMapSnapshot implements WriteAheadSnapshot, RecordLookup prepareSnapshot(final long maxTransactionId) { - return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId); + return prepareSnapshot(maxTransactionId, this.swapLocations); + } + + @Override + public SnapshotCapture prepareSnapshot(final long maxTransactionId, final Set swapFileLocations) { + return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapFileLocations), maxTransactionId); } private int getVersion() { diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java index 11eb31cdd3..240a212293 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/SequentialAccessWriteAheadLog.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -65,6 +66,7 @@ public class SequentialAccessWriteAheadLog implements WriteAheadRepository private final File journalsDirectory; private final SerDeFactory serdeFactory; private final SyncListener syncListener; + private final Set recoveredSwapLocations = new HashSet<>(); private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock(); private final Lock journalReadLock = journalRWLock.readLock(); @@ -144,6 +146,7 @@ public class SequentialAccessWriteAheadLog implements WriteAheadRepository final long recoverStart = System.nanoTime(); recovered = true; snapshotRecovery = snapshot.recover(); + this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations()); final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart); @@ -212,7 +215,9 @@ public class SequentialAccessWriteAheadLog implements WriteAheadRepository final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis); - checkpoint(); + this.recoveredSwapLocations.addAll(swapLocations); + + checkpoint(this.recoveredSwapLocations); return recoveredRecords.values(); } @@ -238,11 +243,15 @@ public class SequentialAccessWriteAheadLog implements WriteAheadRepository throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed"); } - return snapshotRecovery.getRecoveredSwapLocations(); + return Collections.unmodifiableSet(this.recoveredSwapLocations); } @Override public int checkpoint() throws IOException { + return checkpoint(null); + } + + private int checkpoint(final Set swapLocations) throws IOException { final SnapshotCapture snapshotCapture; final long startNanos = System.nanoTime(); @@ -276,7 +285,12 @@ public class SequentialAccessWriteAheadLog implements WriteAheadRepository final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile); existingJournals = (existingFiles == null) ? new File[0] : existingFiles; - snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); + if (swapLocations == null) { + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); + } else { + snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1, swapLocations); + } + // Create a new journal. We name the journal file .journal but it is possible // that we could have an empty journal file already created. If this happens, we don't want to create diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java index a4cbcd2892..fd7cfd8a67 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/WriteAheadSnapshot.java @@ -19,10 +19,13 @@ package org.apache.nifi.wali; import java.io.IOException; import java.util.Collection; +import java.util.Set; public interface WriteAheadSnapshot { SnapshotCapture prepareSnapshot(long maxTransactionId); + SnapshotCapture prepareSnapshot(long maxTransactionId, Set swapLocations); + void writeSnapshot(SnapshotCapture snapshot) throws IOException; SnapshotRecovery recover() throws IOException; diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java index 05fc8a57cd..b7f18abbf0 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/WriteAheadRepository.java @@ -65,7 +65,7 @@ public interface WriteAheadRepository { * if power is lost or the Operating System crashes * @throws IOException if failure to update repo * @throws IllegalArgumentException if multiple records within the given - * Collection have the same ID, as specified by {@link Record#getId()} + * Collection have the same ID, as specified by {@link SerDe#getRecordIdentifier(Object)} * method * * @return the index of the Partition that performed the update diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java index 6560c0aeb8..b9ff2497c9 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java @@ -16,14 +16,14 @@ */ package org.apache.nifi.controller.repository; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; + import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.List; -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; - /** * Implementations must be thread safe * @@ -128,4 +128,24 @@ public interface FlowFileRepository extends Closeable { * @throws IOException if swap fails */ void swapFlowFilesIn(String swapLocation, List flowFileRecords, FlowFileQueue flowFileQueue) throws IOException; + + /** + *

+ * Determines whether or not the given swap location suffix is a valid, known location according to this FlowFileRepository. Note that while + * the {@link #swapFlowFilesIn(String, List, FlowFileQueue)} and {@link #swapFlowFilesOut(List, FlowFileQueue, String)} methods expect + * a full "swap location" this method expects only the "suffix" of a swap location. For example, if the location points to a file, this method + * would expect only the filename, not the full path. + *

+ * + *

+ * This method differs from the others because the other methods want to store the swap location or recover from a given location. However, + * this method is used to verify that the location is known. If for any reason, NiFi is stopped, its FlowFile Repository relocated to a new + * location (for example, a different disk partition), and restarted, the swap location would not match if we used the full location. Therefore, + * by using only the "suffix" (i.e. the filename for a file-based implementation), we can avoid worrying about relocation. + *

+ * + * @param swapLocationSuffix the suffix of the location to check + * @return true if the swap location is known and valid, false otherwise + */ + boolean isValidSwapLocationSuffix(String swapLocationSuffix); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 6bb1ea82ef..4b9af46b30 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -239,6 +239,7 @@ src/test/resources/bye.txt src/test/resources/old-swap-file.swap src/test/resources/xxe_template.xml + src/test/resources/swap/444-old-swap-file.swap diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 5f8f925171..b2717c28d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; @@ -27,6 +28,8 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.swap.SchemaSwapDeserializer; import org.apache.nifi.controller.swap.SchemaSwapSerializer; import org.apache.nifi.controller.swap.SimpleSwapDeserializer; +import org.apache.nifi.controller.swap.StandardSwapContents; +import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.SwapDeserializer; import org.apache.nifi.controller.swap.SwapSerializer; import org.apache.nifi.events.EventReporter; @@ -95,14 +98,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } public FileSystemSwapManager(final NiFiProperties nifiProperties) { - final Path flowFileRepoPath = nifiProperties.getFlowFileRepositoryPath(); + this(nifiProperties.getFlowFileRepositoryPath()); + } + public FileSystemSwapManager(final Path flowFileRepoPath) { this.storageDirectory = flowFileRepoPath.resolve("swap").toFile(); if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath()); } } + @Override public synchronized void initialize(final SwapManagerInitializationContext initializationContext) { this.claimManager = initializationContext.getResourceClaimManager(); @@ -152,6 +158,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager { @Override public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { final File swapFile = new File(swapLocation); + + final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName()); + if (!validLocation) { + warn("Cannot swap in FlowFiles from location " + swapLocation + " because the FlowFile Repository does not know about this Swap Location. " + + "This file should be manually removed. This typically occurs when a Swap File is written but the FlowFile Repository is not updated yet to reflect this. " + + "This is generally not a cause for concern, but may be indicative of a failure to update the FlowFile Repository."); + final SwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0), 0L, Collections.emptyList()); + return new StandardSwapContents(swapSummary, Collections.emptyList()); + } + final SwapContents swapContents = peek(swapLocation, flowFileQueue); flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue); @@ -311,7 +327,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - Collections.sort(swapLocations, new SwapFileComparator()); + swapLocations.sort(new SwapFileComparator()); return swapLocations; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index 058c7149e5..df19f442e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -180,19 +180,23 @@ public class SwappablePriorityQueue { int flowFilesSwappedOut = 0; final List swapLocations = new ArrayList<>(numSwapFiles); for (int i = 0; i < numSwapFiles; i++) { + long bytesSwappedThisIteration = 0L; + // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records final List toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { final FlowFileRecord flowFile = tempQueue.poll(); toSwap.add(flowFile); - bytesSwappedOut += flowFile.getSize(); - flowFilesSwappedOut++; + bytesSwappedThisIteration += flowFile.getSize(); } try { Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue. final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName); swapLocations.add(swapLocation); + + bytesSwappedOut += bytesSwappedThisIteration; + flowFilesSwappedOut += toSwap.size(); } catch (final IOException ioe) { tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index cc3ac19905..216449cfa7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -161,7 +161,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // so that we are able to aggregate many into a single Fork Event. private final Map forkEventBuilders = new HashMap<>(); - private Checkpoint checkpoint = new Checkpoint(); + private Checkpoint checkpoint = null; private final ContentClaimWriteCache claimCache; public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { @@ -1489,7 +1489,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) { final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); - records.put(flowFile.getId(), record); + + // Ensure that the checkpoint does not have a FlowFile with the same ID already. This should not occur, + // but this is a safety check just to make sure, because if it were to occur, and we did process the FlowFile, + // we would have a lot of problems, since the map is keyed off of the FlowFile ID. + if (this.checkpoint != null) { + final StandardRepositoryRecord checkpointedRecord = this.checkpoint.getRecord(flowFile); + handleConflictingId(flowFile, connection, checkpointedRecord); + } + + final StandardRepositoryRecord existingRecord = records.putIfAbsent(flowFile.getId(), record); + handleConflictingId(flowFile, connection, existingRecord); // Ensure that we have no conflicts + flowFilesIn++; contentSizeIn += flowFile.getSize(); @@ -1503,6 +1514,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE incrementConnectionOutputCounts(connection, flowFile); } + private void handleConflictingId(final FlowFileRecord flowFile, final Connection connection, final StandardRepositoryRecord conflict) { + if (conflict == null) { + // No conflict + return; + } + + LOG.error("Attempted to pull {} from {} but the Session already has a FlowFile with the same ID ({}): {}, which was pulled from {}. This means that the system has two FlowFiles with the" + + " same ID, which should not happen.", flowFile, connection, flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue()); + connection.getFlowFileQueue().put(flowFile); + + rollback(true, false); + throw new FlowFileAccessException("Attempted to pull a FlowFile with ID " + flowFile.getId() + " from Connection " + + connection + " but a FlowFile with that ID already exists in the session"); + } + @Override public void adjustCounter(final String name, final long delta, final boolean immediate) { verifyTaskActive(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java index dee5346553..da714a652a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java @@ -16,16 +16,16 @@ */ package org.apache.nifi.controller.repository; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + /** *

* An in-memory implementation of the {@link FlowFileRepository}. Upon restart, all FlowFiles will be discarded, including those that have been swapped out by a {@link FlowFileSwapManager}. @@ -137,4 +137,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository { public void swapFlowFilesOut(List swappedOut, FlowFileQueue queue, String swapLocation) throws IOException { } + @Override + public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) { + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index b5a61c6e09..d8e45f2c95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -16,6 +16,19 @@ */ package org.apache.nifi.controller.repository; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.wali.SequentialAccessWriteAheadLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.SyncListener; +import org.wali.WriteAheadRepository; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -42,19 +55,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.wali.SequentialAccessWriteAheadLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.MinimalLockingWriteAheadLog; -import org.wali.SyncListener; -import org.wali.WriteAheadRepository; - /** *

* Implements FlowFile Repository using WALI as the backing store. @@ -101,6 +101,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; + private final Set swapLocationSuffixes = new HashSet<>(); // guraded by synchronizing on object itself + // effectively final private WriteAheadRepository wal; private RepositoryRecordSerdeFactory serdeFactory; @@ -134,7 +136,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis */ public WriteAheadFlowFileRepository() { alwaysSync = false; - checkpointDelayMillis = 0l; + checkpointDelayMillis = 0L; numPartitions = 0; checkpointExecutor = null; walImplementation = null; @@ -278,6 +280,13 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis return !resourceClaim.isInUse(); } + @Override + public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) { + synchronized (swapLocationSuffixes) { + return swapLocationSuffixes.contains(swapLocationSuffix); + } + } + private void updateRepository(final Collection records, final boolean sync) throws IOException { for (final RepositoryRecord record : records) { if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING @@ -308,6 +317,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // This does not, however, cause problems, as ContentRepository should handle this // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. final Set claimsToAdd = new HashSet<>(); + + final Set swapLocationsAdded = new HashSet<>(); + final Set swapLocationsRemoved = new HashSet<>(); + for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if claim is destructible, mark it so @@ -324,6 +337,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) { claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } + } else if (record.getType() == RepositoryRecordType.SWAP_OUT) { + final String swapLocation = record.getSwapLocation(); + swapLocationsAdded.add(swapLocation); + swapLocationsRemoved.remove(swapLocation); + } else if (record.getType() == RepositoryRecordType.SWAP_IN) { + final String swapLocation = record.getSwapLocation(); + swapLocationsRemoved.add(swapLocation); + swapLocationsAdded.remove(swapLocation); } final List transientClaims = record.getTransientClaims(); @@ -336,6 +357,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } } + // If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes. + if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) { + synchronized (swapLocationSuffixes) { + swapLocationsRemoved.forEach(loc -> swapLocationSuffixes.remove(getLocationSuffix(loc))); + swapLocationsAdded.forEach(loc -> swapLocationSuffixes.add(getLocationSuffix(loc))); + } + } + if (!claimsToAdd.isEmpty()) { // Get / Register a Set for the given Partiton Index final Integer partitionKey = Integer.valueOf(partitionIndex); @@ -352,6 +381,20 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } } + protected static String getLocationSuffix(final String swapLocation) { + if (swapLocation == null) { + return null; + } + + final String withoutTrailing = (swapLocation.endsWith("/") && swapLocation.length() > 1) ? swapLocation.substring(0, swapLocation.length() - 1) : swapLocation; + final int lastIndex = withoutTrailing.lastIndexOf("/"); + if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) { + return withoutTrailing; + } + + return withoutTrailing.substring(lastIndex + 1); + } + @Override public void onSync(final int partitionIndex) { final BlockingQueue claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); @@ -407,6 +450,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // update WALI to indicate that the records were swapped out. wal.update(repoRecords, true); + synchronized (this.swapLocationSuffixes) { + this.swapLocationSuffixes.add(getLocationSuffix(swapLocation)); + } + logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation}); } @@ -423,6 +470,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } updateRepository(repoRecords, true); + + synchronized (this.swapLocationSuffixes) { + this.swapLocationSuffixes.add(getLocationSuffix(swapLocation)); + } + logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); } @@ -544,6 +596,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // Repo was written using that impl, that we properly recover from the implementation. Collection recordList = wal.recoverRecords(); + final Set recoveredSwapLocations = wal.getRecoveredSwapLocations(); + synchronized (this.swapLocationSuffixes) { + recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(getLocationSuffix(loc))); + logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes); + } + // If we didn't recover any records from our write-ahead log, attempt to recover records from the other implementation // of the write-ahead log. We do this in case the user changed the "nifi.flowfile.repository.wal.impl" property. // In such a case, we still want to recover the records from the previous FlowFile Repository and write them into the new one. @@ -591,7 +649,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will // return the appropriate number. flowFileSequenceGenerator.set(maxId + 1); - logger.info("Successfully restored {} FlowFiles", recordList.size() - numFlowFilesMissingQueue); + logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size()); if (numFlowFilesMissingQueue > 0) { logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java index 33b71f0fdd..a1206c7a86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockSwapManager.java @@ -17,16 +17,6 @@ package org.apache.nifi.controller; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; @@ -39,6 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + public class MockSwapManager implements FlowFileSwapManager { public final Map> swappedOut = new HashMap<>(); public int swapOutCalledCount = 0; @@ -49,10 +49,22 @@ public class MockSwapManager implements FlowFileSwapManager { public int failSwapInAfterN = -1; public Throwable failSwapInFailure = null; + private int failSwapOutAfterN = -1; + private IOException failSwapOutFailure = null; + public void setSwapInFailure(final Throwable t) { this.failSwapInFailure = t; } + public void setSwapOutFailureOnNthIteration(final int n) { + setSwapOutFailureOnNthIteration(n, null); + } + + public void setSwapOutFailureOnNthIteration(final int n, final IOException failureException) { + this.failSwapOutAfterN = n; + this.failSwapOutFailure = failureException; + } + @Override public void initialize(final SwapManagerInitializationContext initializationContext) { @@ -65,6 +77,12 @@ public class MockSwapManager implements FlowFileSwapManager { @Override public String swapOut(List flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException { swapOutCalledCount++; + + if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN) { + final IOException ioe = failSwapOutFailure == null ? new IOException("Intentional Unit Test IOException on swap out call number " + swapOutCalledCount) : failSwapOutFailure; + throw ioe; + } + final String location = UUID.randomUUID().toString() + "." + partitionName; swappedOut.put(location, new ArrayList<>(flowFiles)); return location; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 46bea3155f..dd71f0e9b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; - -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; @@ -36,9 +24,30 @@ import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.stream.io.StreamUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + public class TestFileSystemSwapManager { @Test @@ -48,7 +57,7 @@ public class TestFileSystemSwapManager { final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) { final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); - Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); + when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); final FileSystemSwapManager swapManager = createSwapManager(); final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue); @@ -63,11 +72,49 @@ public class TestFileSystemSwapManager { } } + @Test + public void testFailureOnRepoSwapOut() throws IOException { + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); - private FileSystemSwapManager createSwapManager() { - final FileSystemSwapManager swapManager = new FileSystemSwapManager(); + final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + Mockito.doThrow(new IOException("Intentional IOException for unit test")) + .when(flowFileRepo).updateRepository(anyCollection()); + + final FileSystemSwapManager swapManager = createSwapManager(); + + final List flowFileRecords = new ArrayList<>(); + for (int i=0; i < 10000; i++) { + flowFileRecords.add(new MockFlowFileRecord(i)); + } + + try { + swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1"); + Assert.fail("Expected IOException"); + } catch (final IOException ioe) { + // expected + } + } + + @Test + public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException { + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(""); + + final File targetDir = new File("target/swap"); + targetDir.mkdirs(); + + final File targetFile = new File(targetDir, "444-old-swap-file.swap"); + final File originalSwapFile = new File("src/test/resources/swap/444-old-swap-file.swap"); + try (final OutputStream fos = new FileOutputStream(targetFile); + final InputStream fis = new FileInputStream(originalSwapFile)) { + StreamUtils.copy(fis, fos); + } + + final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target")); final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager(); - final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class); + final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + swapManager.initialize(new SwapManagerInitializationContext() { @Override public ResourceClaimManager getResourceClaimManager() { @@ -76,7 +123,46 @@ public class TestFileSystemSwapManager { @Override public FlowFileRepository getFlowFileRepository() { - return flowfileRepo; + return flowFileRepo; + } + + @Override + public EventReporter getEventReporter() { + return EventReporter.NO_OP; + } + }); + + when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false); + final List recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null); + assertEquals(1, recoveredLocations.size()); + + final String firstLocation = recoveredLocations.get(0); + final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue); + assertEquals(0, emptyContents.getFlowFiles().size()); + + when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true); + when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); + final SwapContents contents = swapManager.swapIn(firstLocation, flowFileQueue); + assertEquals(10000, contents.getFlowFiles().size()); + } + + private FileSystemSwapManager createSwapManager() { + final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); + return createSwapManager(flowFileRepo); + } + + private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) { + final FileSystemSwapManager swapManager = new FileSystemSwapManager(); + final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager(); + swapManager.initialize(new SwapManagerInitializationContext() { + @Override + public ResourceClaimManager getResourceClaimManager() { + return resourceClaimManager; + } + + @Override + public FlowFileRepository getFlowFileRepository() { + return flowFileRepo; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java index 71ad257e1b..ef1a06353d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java @@ -17,21 +17,6 @@ package org.apache.nifi.controller.queue.clustered; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.nifi.controller.MockFlowFileRecord; import org.apache.nifi.controller.MockSwapManager; import org.apache.nifi.controller.queue.DropFlowFileAction; @@ -42,16 +27,34 @@ import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.MockFlowFile; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestSwappablePriorityQueue { private MockSwapManager swapManager; - private final EventReporter eventReporter = EventReporter.NO_OP; + private final List events = new ArrayList<>(); + private EventReporter eventReporter; + private final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); private final DropFlowFileAction dropAction = (flowFiles, requestor) -> { return new QueueSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); @@ -63,11 +66,35 @@ public class TestSwappablePriorityQueue { public void setup() { swapManager = new MockSwapManager(); + events.clear(); + eventReporter = new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + events.add(message); + } + }; + when(flowFileQueue.getIdentifier()).thenReturn("unit-test"); queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local"); } + @Test + public void testSwapOutFailureLeavesCorrectQueueSize() { + swapManager.setSwapOutFailureOnNthIteration(1, null); + + for (int i = 0; i < 19999; i++) { + queue.put(new MockFlowFile(i)); + } + + assertEquals(19999, queue.size().getObjectCount()); + assertEquals(0, events.size()); + + queue.put(new MockFlowFile(20000)); + assertEquals(20000, queue.size().getObjectCount()); + assertEquals(1, events.size()); // Expect a single failure event to be emitted + } + @Test public void testPrioritizer() { final FlowFilePrioritizer prioritizer = (o1, o2) -> Long.compare(o1.getId(), o2.getId()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index efe2bd4380..7cd2fd62d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -277,14 +277,13 @@ public class TestStandardProcessSession { connList.add(conn1); connList.add(conn2); - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder() .id(1000L) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .entryDate(System.currentTimeMillis()); - flowFileQueue.put(flowFileRecord); - flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecordBuilder.build()); + flowFileQueue.put(flowFileRecordBuilder.id(1001).build()); when(connectable.getIncomingConnections()).thenReturn(connList); @@ -295,6 +294,36 @@ public class TestStandardProcessSession { verify(conn2, times(1)).poll(any(Set.class)); } + @Test + public void testHandlingOfMultipleFlowFilesWithSameId() { + // Add two FlowFiles with the same ID + for (int i=0; i < 2; i++) { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(0L) + .build(); + + flowFileQueue.put(flowFileRecord); + } + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + FlowFile ff1 = session.get(); + assertNotNull(ff1); + + session.transfer(ff1, relationship); + + try { + session.get(); + Assert.fail("Should not have been able to poll second FlowFile with same ID"); + } catch (final FlowFileAccessException e) { + // Expected + } + } + + @Test public void testCloneOriginalDataSmaller() throws IOException { final byte[] originalContent = "hello".getBytes(); @@ -416,14 +445,14 @@ public class TestStandardProcessSession { connList.add(conn1); connList.add(conn2); - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + + final StandardFlowFileRecord.Builder flowFileRecord = new StandardFlowFileRecord.Builder() .id(1000L) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .entryDate(System.currentTimeMillis()); - flowFileQueue.put(flowFileRecord); - flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecord.build()); + flowFileQueue.put(flowFileRecord.id(1001).build()); when(connectable.getIncomingConnections()).thenReturn(connList); @@ -475,14 +504,13 @@ public class TestStandardProcessSession { connList.add(conn1); connList.add(conn2); - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder() .id(1000L) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .build(); + .entryDate(System.currentTimeMillis()); - flowFileQueue.put(flowFileRecord); - flowFileQueue.put(flowFileRecord); + flowFileQueue.put(flowFileRecordBuilder.build()); + flowFileQueue.put(flowFileRecordBuilder.id(10001L).build()); when(connectable.getIncomingConnections()).thenReturn(connList); @@ -1383,10 +1411,11 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) + .id(1) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) - .build(); + .build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -1399,12 +1428,13 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) + .id(2) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) - .contentClaimOffset(1000L) - .size(1000L) - .build(); + .contentClaimOffset(1000L) + .size(1000L) + .build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -1424,10 +1454,11 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) + .id(1) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) - .build(); + .build(); flowFileQueue.put(flowFileRecord); @@ -1441,10 +1472,11 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) + .id(2) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) - .contentClaimOffset(1000L).size(1L).build(); + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -1544,11 +1576,15 @@ public class TestStandardProcessSession { @Test public void testRollbackAfterCheckpoint() { - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) + final StandardFlowFileRecord.Builder recordBuilder = new StandardFlowFileRecord.Builder() + .id(1) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) - .contentClaimOffset(0L).size(0L).build(); + .contentClaimOffset(0L) + .size(0L); + + final FlowFileRecord flowFileRecord = recordBuilder.build(); flowFileQueue.put(flowFileRecord); final FlowFile originalFlowFile = session.get(); @@ -1574,7 +1610,7 @@ public class TestStandardProcessSession { session.rollback(); - flowFileQueue.put(flowFileRecord); + flowFileQueue.put(recordBuilder.id(2).build()); assertFalse(flowFileQueue.isActiveQueueEmpty()); final FlowFile originalRound2 = session.get(); @@ -1596,8 +1632,8 @@ public class TestStandardProcessSession { session.commit(); - // FlowFile transferred back to queue - assertEquals(1, flowFileQueue.size().getObjectCount()); + // FlowFiles transferred back to queue + assertEquals(2, flowFileQueue.size().getObjectCount()); assertFalse(flowFileQueue.isUnacknowledgedFlowFile()); assertFalse(flowFileQueue.isActiveQueueEmpty()); } @@ -2116,6 +2152,11 @@ public class TestStandardProcessSession { @Override public void initialize(ResourceClaimManager claimManager) throws IOException { } + + @Override + public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) { + return false; + } } private static class MockContentRepository implements ContentRepository { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index a3ee5c16b3..1761bd8465 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -70,6 +70,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -396,6 +397,101 @@ public class TestWriteAheadFlowFileRepository { } + @Test + public void testGetLocationSuffix() { + assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/")); + assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix("")); + assertEquals(null, WriteAheadFlowFileRepository.getLocationSuffix(null)); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/test.txt")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("//test.txt")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt/")); + assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/")); + } + + @Test + public void testSwapLocationsRestored() throws IOException { + final Path path = Paths.get("target/test-swap-repo"); + if (Files.exists(path)) { + FileUtils.deleteFile(path.toFile(), true); + } + + final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null)); + repo.initialize(new StandardResourceClaimManager()); + + final TestQueueProvider queueProvider = new TestQueueProvider(); + repo.loadFlowFiles(queueProvider, 0L); + + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + when(connection.getFlowFileQueue()).thenReturn(queue); + + queueProvider.addConnection(connection); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id(1L); + ffBuilder.size(0L); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final List records = new ArrayList<>(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123"); + record.setDestination(queue); + records.add(record); + + repo.updateRepository(records); + repo.close(); + + // restore + final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null)); + repo2.initialize(new StandardResourceClaimManager()); + repo2.loadFlowFiles(queueProvider, 0L); + assertTrue(repo2.isValidSwapLocationSuffix("swap123")); + assertFalse(repo2.isValidSwapLocationSuffix("other")); + repo2.close(); + } + + @Test + public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException { + final Path path = Paths.get("target/test-swap-repo"); + if (Files.exists(path)) { + FileUtils.deleteFile(path.toFile(), true); + } + + final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null)); + repo.initialize(new StandardResourceClaimManager()); + + final TestQueueProvider queueProvider = new TestQueueProvider(); + repo.loadFlowFiles(queueProvider, 0L); + + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + when(connection.getFlowFileQueue()).thenReturn(queue); + + queueProvider.addConnection(connection); + + StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id(1L); + ffBuilder.size(0L); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final List records = new ArrayList<>(); + final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123"); + record.setDestination(queue); + records.add(record); + + assertFalse(repo.isValidSwapLocationSuffix("swap123")); + repo.updateRepository(records); + assertTrue(repo.isValidSwapLocationSuffix("swap123")); + repo.close(); + } @Test public void testResourceClaimsIncremented() throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap new file mode 100755 index 0000000000..0176ed9f84 Binary files /dev/null and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/swap/444-old-swap-file.swap differ