NIFI-5997: If we swap out data, ensure that we do not increment the size of the queue by the size of the data that we failed to swap out. Also, if the FlowFile Repo does not know about a given swap file, do not restore it on restart

This closes #3290.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-02-01 13:10:51 -05:00 committed by Bryan Bende
parent 2c2b47ab85
commit 83ac191736
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
18 changed files with 539 additions and 121 deletions

View File

@ -18,7 +18,7 @@ package org.apache.nifi.processor.exception;
/** /**
* Indicates an issue occurred while accessing the content of a FlowFile, such * Indicates an issue occurred while accessing the content of a FlowFile, such
* as an IOException. * as an IOException, or obtaining a reference to the FlowFile
* *
*/ */
public class FlowFileAccessException extends RuntimeException { public class FlowFileAccessException extends RuntimeException {

View File

@ -17,6 +17,12 @@
package org.apache.nifi.wali; package org.apache.nifi.wali;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -37,12 +43,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.SerDe;
import org.wali.SerDeFactory;
import org.wali.UpdateType;
public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> { public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T> {
private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class); private static final Logger logger = LoggerFactory.getLogger(HashMapSnapshot.class);
private static final int ENCODING_VERSION = 1; private static final int ENCODING_VERSION = 1;
@ -216,10 +216,14 @@ public class HashMapSnapshot<T> implements WriteAheadSnapshot<T>, RecordLookup<T
return recordMap.get(recordId); return recordMap.get(recordId);
} }
@Override @Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) { public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId) {
return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapLocations), maxTransactionId); return prepareSnapshot(maxTransactionId, this.swapLocations);
}
@Override
public SnapshotCapture<T> prepareSnapshot(final long maxTransactionId, final Set<String> swapFileLocations) {
return new Snapshot(new HashMap<>(recordMap), new HashSet<>(swapFileLocations), maxTransactionId);
} }
private int getVersion() { private int getVersion() {

View File

@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -65,6 +66,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
private final File journalsDirectory; private final File journalsDirectory;
private final SerDeFactory<T> serdeFactory; private final SerDeFactory<T> serdeFactory;
private final SyncListener syncListener; private final SyncListener syncListener;
private final Set<String> recoveredSwapLocations = new HashSet<>();
private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock(); private final ReadWriteLock journalRWLock = new ReentrantReadWriteLock();
private final Lock journalReadLock = journalRWLock.readLock(); private final Lock journalReadLock = journalRWLock.readLock();
@ -144,6 +146,7 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
final long recoverStart = System.nanoTime(); final long recoverStart = System.nanoTime();
recovered = true; recovered = true;
snapshotRecovery = snapshot.recover(); snapshotRecovery = snapshot.recover();
this.recoveredSwapLocations.addAll(snapshotRecovery.getRecoveredSwapLocations());
final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart); final long snapshotRecoveryMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - recoverStart);
@ -212,7 +215,9 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS); final long recoveryMillis = TimeUnit.MILLISECONDS.convert(recoverNanos, TimeUnit.NANOSECONDS);
logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis); logger.info("Successfully recovered {} records in {} milliseconds. Now checkpointing to ensure that Write-Ahead Log is in a consistent state", recoveredRecords.size(), recoveryMillis);
checkpoint(); this.recoveredSwapLocations.addAll(swapLocations);
checkpoint(this.recoveredSwapLocations);
return recoveredRecords.values(); return recoveredRecords.values();
} }
@ -238,11 +243,15 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed"); throw new IllegalStateException("Cannot retrieve the Recovered Swap Locations until record recovery has been performed");
} }
return snapshotRecovery.getRecoveredSwapLocations(); return Collections.unmodifiableSet(this.recoveredSwapLocations);
} }
@Override @Override
public int checkpoint() throws IOException { public int checkpoint() throws IOException {
return checkpoint(null);
}
private int checkpoint(final Set<String> swapLocations) throws IOException {
final SnapshotCapture<T> snapshotCapture; final SnapshotCapture<T> snapshotCapture;
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
@ -276,7 +285,12 @@ public class SequentialAccessWriteAheadLog<T> implements WriteAheadRepository<T>
final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile); final File[] existingFiles = journalsDirectory.listFiles(this::isJournalFile);
existingJournals = (existingFiles == null) ? new File[0] : existingFiles; existingJournals = (existingFiles == null) ? new File[0] : existingFiles;
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1); if (swapLocations == null) {
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1);
} else {
snapshotCapture = snapshot.prepareSnapshot(nextTransactionId - 1, swapLocations);
}
// Create a new journal. We name the journal file <next transaction id>.journal but it is possible // Create a new journal. We name the journal file <next transaction id>.journal but it is possible
// that we could have an empty journal file already created. If this happens, we don't want to create // that we could have an empty journal file already created. If this happens, we don't want to create

View File

@ -19,10 +19,13 @@ package org.apache.nifi.wali;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
public interface WriteAheadSnapshot<T> { public interface WriteAheadSnapshot<T> {
SnapshotCapture<T> prepareSnapshot(long maxTransactionId); SnapshotCapture<T> prepareSnapshot(long maxTransactionId);
SnapshotCapture<T> prepareSnapshot(long maxTransactionId, Set<String> swapLocations);
void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException; void writeSnapshot(SnapshotCapture<T> snapshot) throws IOException;
SnapshotRecovery<T> recover() throws IOException; SnapshotRecovery<T> recover() throws IOException;

View File

@ -65,7 +65,7 @@ public interface WriteAheadRepository<T> {
* if power is lost or the Operating System crashes * if power is lost or the Operating System crashes
* @throws IOException if failure to update repo * @throws IOException if failure to update repo
* @throws IllegalArgumentException if multiple records within the given * @throws IllegalArgumentException if multiple records within the given
* Collection have the same ID, as specified by {@link Record#getId()} * Collection have the same ID, as specified by {@link SerDe#getRecordIdentifier(Object)}
* method * method
* *
* @return the index of the Partition that performed the update * @return the index of the Partition that performed the update

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
/** /**
* Implementations must be thread safe * Implementations must be thread safe
* *
@ -128,4 +128,24 @@ public interface FlowFileRepository extends Closeable {
* @throws IOException if swap fails * @throws IOException if swap fails
*/ */
void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue flowFileQueue) throws IOException; void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue flowFileQueue) throws IOException;
/**
* <p>
* Determines whether or not the given swap location suffix is a valid, known location according to this FlowFileRepository. Note that while
* the {@link #swapFlowFilesIn(String, List, FlowFileQueue)} and {@link #swapFlowFilesOut(List, FlowFileQueue, String)} methods expect
* a full "swap location" this method expects only the "suffix" of a swap location. For example, if the location points to a file, this method
* would expect only the filename, not the full path.
* </p>
*
* <p>
* This method differs from the others because the other methods want to store the swap location or recover from a given location. However,
* this method is used to verify that the location is known. If for any reason, NiFi is stopped, its FlowFile Repository relocated to a new
* location (for example, a different disk partition), and restarted, the swap location would not match if we used the full location. Therefore,
* by using only the "suffix" (i.e. the filename for a file-based implementation), we can avoid worrying about relocation.
* </p>
*
* @param swapLocationSuffix the suffix of the location to check
* @return <code>true</code> if the swap location is known and valid, <code>false</code> otherwise
*/
boolean isValidSwapLocationSuffix(String swapLocationSuffix);
} }

View File

@ -239,6 +239,7 @@
<exclude>src/test/resources/bye.txt</exclude> <exclude>src/test/resources/bye.txt</exclude>
<exclude>src/test/resources/old-swap-file.swap</exclude> <exclude>src/test/resources/old-swap-file.swap</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude> <exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller; package org.apache.nifi.controller;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
@ -27,6 +28,8 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.SchemaSwapDeserializer; import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
import org.apache.nifi.controller.swap.SchemaSwapSerializer; import org.apache.nifi.controller.swap.SchemaSwapSerializer;
import org.apache.nifi.controller.swap.SimpleSwapDeserializer; import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.controller.swap.SwapDeserializer; import org.apache.nifi.controller.swap.SwapDeserializer;
import org.apache.nifi.controller.swap.SwapSerializer; import org.apache.nifi.controller.swap.SwapSerializer;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
@ -95,14 +98,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
public FileSystemSwapManager(final NiFiProperties nifiProperties) { public FileSystemSwapManager(final NiFiProperties nifiProperties) {
final Path flowFileRepoPath = nifiProperties.getFlowFileRepositoryPath(); this(nifiProperties.getFlowFileRepositoryPath());
}
public FileSystemSwapManager(final Path flowFileRepoPath) {
this.storageDirectory = flowFileRepoPath.resolve("swap").toFile(); this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath()); throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
} }
} }
@Override @Override
public synchronized void initialize(final SwapManagerInitializationContext initializationContext) { public synchronized void initialize(final SwapManagerInitializationContext initializationContext) {
this.claimManager = initializationContext.getResourceClaimManager(); this.claimManager = initializationContext.getResourceClaimManager();
@ -152,6 +158,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
@Override @Override
public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
final File swapFile = new File(swapLocation); final File swapFile = new File(swapLocation);
final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
if (!validLocation) {
warn("Cannot swap in FlowFiles from location " + swapLocation + " because the FlowFile Repository does not know about this Swap Location. " +
"This file should be manually removed. This typically occurs when a Swap File is written but the FlowFile Repository is not updated yet to reflect this. " +
"This is generally not a cause for concern, but may be indicative of a failure to update the FlowFile Repository.");
final SwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0), 0L, Collections.emptyList());
return new StandardSwapContents(swapSummary, Collections.emptyList());
}
final SwapContents swapContents = peek(swapLocation, flowFileQueue); final SwapContents swapContents = peek(swapLocation, flowFileQueue);
flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue); flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue);
@ -311,7 +327,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
Collections.sort(swapLocations, new SwapFileComparator()); swapLocations.sort(new SwapFileComparator());
return swapLocations; return swapLocations;
} }

View File

@ -180,19 +180,23 @@ public class SwappablePriorityQueue {
int flowFilesSwappedOut = 0; int flowFilesSwappedOut = 0;
final List<String> swapLocations = new ArrayList<>(numSwapFiles); final List<String> swapLocations = new ArrayList<>(numSwapFiles);
for (int i = 0; i < numSwapFiles; i++) { for (int i = 0; i < numSwapFiles; i++) {
long bytesSwappedThisIteration = 0L;
// Create a new swap file for the next SWAP_RECORD_POLL_SIZE records // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
final FlowFileRecord flowFile = tempQueue.poll(); final FlowFileRecord flowFile = tempQueue.poll();
toSwap.add(flowFile); toSwap.add(flowFile);
bytesSwappedOut += flowFile.getSize(); bytesSwappedThisIteration += flowFile.getSize();
flowFilesSwappedOut++;
} }
try { try {
Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue. Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName); final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName);
swapLocations.add(swapLocation); swapLocations.add(swapLocation);
bytesSwappedOut += bytesSwappedThisIteration;
flowFilesSwappedOut += toSwap.size();
} catch (final IOException ioe) { } catch (final IOException ioe) {
tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue. tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.

View File

@ -161,7 +161,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// so that we are able to aggregate many into a single Fork Event. // so that we are able to aggregate many into a single Fork Event.
private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>(); private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>();
private Checkpoint checkpoint = new Checkpoint(); private Checkpoint checkpoint = null;
private final ContentClaimWriteCache claimCache; private final ContentClaimWriteCache claimCache;
public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) {
@ -1489,7 +1489,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) { private void registerDequeuedRecord(final FlowFileRecord flowFile, final Connection connection) {
final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
records.put(flowFile.getId(), record);
// Ensure that the checkpoint does not have a FlowFile with the same ID already. This should not occur,
// but this is a safety check just to make sure, because if it were to occur, and we did process the FlowFile,
// we would have a lot of problems, since the map is keyed off of the FlowFile ID.
if (this.checkpoint != null) {
final StandardRepositoryRecord checkpointedRecord = this.checkpoint.getRecord(flowFile);
handleConflictingId(flowFile, connection, checkpointedRecord);
}
final StandardRepositoryRecord existingRecord = records.putIfAbsent(flowFile.getId(), record);
handleConflictingId(flowFile, connection, existingRecord); // Ensure that we have no conflicts
flowFilesIn++; flowFilesIn++;
contentSizeIn += flowFile.getSize(); contentSizeIn += flowFile.getSize();
@ -1503,6 +1514,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
incrementConnectionOutputCounts(connection, flowFile); incrementConnectionOutputCounts(connection, flowFile);
} }
private void handleConflictingId(final FlowFileRecord flowFile, final Connection connection, final StandardRepositoryRecord conflict) {
if (conflict == null) {
// No conflict
return;
}
LOG.error("Attempted to pull {} from {} but the Session already has a FlowFile with the same ID ({}): {}, which was pulled from {}. This means that the system has two FlowFiles with the" +
" same ID, which should not happen.", flowFile, connection, flowFile.getId(), conflict.getCurrent(), conflict.getOriginalQueue());
connection.getFlowFileQueue().put(flowFile);
rollback(true, false);
throw new FlowFileAccessException("Attempted to pull a FlowFile with ID " + flowFile.getId() + " from Connection "
+ connection + " but a FlowFile with that ID already exists in the session");
}
@Override @Override
public void adjustCounter(final String name, final long delta, final boolean immediate) { public void adjustCounter(final String name, final long delta, final boolean immediate) {
verifyTaskActive(); verifyTaskActive();

View File

@ -16,16 +16,16 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* <p> * <p>
* An in-memory implementation of the {@link FlowFileRepository}. Upon restart, all FlowFiles will be discarded, including those that have been swapped out by a {@link FlowFileSwapManager}. * An in-memory implementation of the {@link FlowFileRepository}. Upon restart, all FlowFiles will be discarded, including those that have been swapped out by a {@link FlowFileSwapManager}.
@ -137,4 +137,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue queue, String swapLocation) throws IOException { public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue queue, String swapLocation) throws IOException {
} }
@Override
public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
return false;
}
} }

View File

@ -16,6 +16,19 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -42,19 +55,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;
/** /**
* <p> * <p>
* Implements FlowFile Repository using WALI as the backing store. * Implements FlowFile Repository using WALI as the backing store.
@ -101,6 +101,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private final int numPartitions; private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor; private final ScheduledExecutorService checkpointExecutor;
private final Set<String> swapLocationSuffixes = new HashSet<>(); // guraded by synchronizing on object itself
// effectively final // effectively final
private WriteAheadRepository<RepositoryRecord> wal; private WriteAheadRepository<RepositoryRecord> wal;
private RepositoryRecordSerdeFactory serdeFactory; private RepositoryRecordSerdeFactory serdeFactory;
@ -134,7 +136,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
*/ */
public WriteAheadFlowFileRepository() { public WriteAheadFlowFileRepository() {
alwaysSync = false; alwaysSync = false;
checkpointDelayMillis = 0l; checkpointDelayMillis = 0L;
numPartitions = 0; numPartitions = 0;
checkpointExecutor = null; checkpointExecutor = null;
walImplementation = null; walImplementation = null;
@ -278,6 +280,13 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
return !resourceClaim.isInUse(); return !resourceClaim.isInUse();
} }
@Override
public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
synchronized (swapLocationSuffixes) {
return swapLocationSuffixes.contains(swapLocationSuffix);
}
}
private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
for (final RepositoryRecord record : records) { for (final RepositoryRecord record : records) {
if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING if (record.getType() != RepositoryRecordType.DELETE && record.getType() != RepositoryRecordType.CONTENTMISSING
@ -308,6 +317,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// This does not, however, cause problems, as ContentRepository should handle this // This does not, however, cause problems, as ContentRepository should handle this
// This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface.
final Set<ResourceClaim> claimsToAdd = new HashSet<>(); final Set<ResourceClaim> claimsToAdd = new HashSet<>();
final Set<String> swapLocationsAdded = new HashSet<>();
final Set<String> swapLocationsRemoved = new HashSet<>();
for (final RepositoryRecord record : records) { for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) { if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if claim is destructible, mark it so // For any DELETE record that we have, if claim is destructible, mark it so
@ -324,6 +337,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) { if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
} }
} else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
final String swapLocation = record.getSwapLocation();
swapLocationsAdded.add(swapLocation);
swapLocationsRemoved.remove(swapLocation);
} else if (record.getType() == RepositoryRecordType.SWAP_IN) {
final String swapLocation = record.getSwapLocation();
swapLocationsRemoved.add(swapLocation);
swapLocationsAdded.remove(swapLocation);
} }
final List<ContentClaim> transientClaims = record.getTransientClaims(); final List<ContentClaim> transientClaims = record.getTransientClaims();
@ -336,6 +357,14 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
} }
} }
// If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes.
if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
synchronized (swapLocationSuffixes) {
swapLocationsRemoved.forEach(loc -> swapLocationSuffixes.remove(getLocationSuffix(loc)));
swapLocationsAdded.forEach(loc -> swapLocationSuffixes.add(getLocationSuffix(loc)));
}
}
if (!claimsToAdd.isEmpty()) { if (!claimsToAdd.isEmpty()) {
// Get / Register a Set<ContentClaim> for the given Partiton Index // Get / Register a Set<ContentClaim> for the given Partiton Index
final Integer partitionKey = Integer.valueOf(partitionIndex); final Integer partitionKey = Integer.valueOf(partitionIndex);
@ -352,6 +381,20 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
} }
} }
protected static String getLocationSuffix(final String swapLocation) {
if (swapLocation == null) {
return null;
}
final String withoutTrailing = (swapLocation.endsWith("/") && swapLocation.length() > 1) ? swapLocation.substring(0, swapLocation.length() - 1) : swapLocation;
final int lastIndex = withoutTrailing.lastIndexOf("/");
if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
return withoutTrailing;
}
return withoutTrailing.substring(lastIndex + 1);
}
@Override @Override
public void onSync(final int partitionIndex) { public void onSync(final int partitionIndex) {
final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex)); final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(Integer.valueOf(partitionIndex));
@ -407,6 +450,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// update WALI to indicate that the records were swapped out. // update WALI to indicate that the records were swapped out.
wal.update(repoRecords, true); wal.update(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
}
logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation}); logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
} }
@ -423,6 +470,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
} }
updateRepository(repoRecords, true); updateRepository(repoRecords, true);
synchronized (this.swapLocationSuffixes) {
this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
}
logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue}); logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
} }
@ -544,6 +596,12 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// Repo was written using that impl, that we properly recover from the implementation. // Repo was written using that impl, that we properly recover from the implementation.
Collection<RepositoryRecord> recordList = wal.recoverRecords(); Collection<RepositoryRecord> recordList = wal.recoverRecords();
final Set<String> recoveredSwapLocations = wal.getRecoveredSwapLocations();
synchronized (this.swapLocationSuffixes) {
recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(getLocationSuffix(loc)));
logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
}
// If we didn't recover any records from our write-ahead log, attempt to recover records from the other implementation // If we didn't recover any records from our write-ahead log, attempt to recover records from the other implementation
// of the write-ahead log. We do this in case the user changed the "nifi.flowfile.repository.wal.impl" property. // of the write-ahead log. We do this in case the user changed the "nifi.flowfile.repository.wal.impl" property.
// In such a case, we still want to recover the records from the previous FlowFile Repository and write them into the new one. // In such a case, we still want to recover the records from the previous FlowFile Repository and write them into the new one.
@ -591,7 +649,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will
// return the appropriate number. // return the appropriate number.
flowFileSequenceGenerator.set(maxId + 1); flowFileSequenceGenerator.set(maxId + 1);
logger.info("Successfully restored {} FlowFiles", recordList.size() - numFlowFilesMissingQueue); logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
if (numFlowFilesMissingQueue > 0) { if (numFlowFilesMissingQueue > 0) {
logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue); logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
} }

View File

@ -17,16 +17,6 @@
package org.apache.nifi.controller; package org.apache.nifi.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
@ -39,6 +29,16 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.StandardSwapSummary;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
public class MockSwapManager implements FlowFileSwapManager { public class MockSwapManager implements FlowFileSwapManager {
public final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); public final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
public int swapOutCalledCount = 0; public int swapOutCalledCount = 0;
@ -49,10 +49,22 @@ public class MockSwapManager implements FlowFileSwapManager {
public int failSwapInAfterN = -1; public int failSwapInAfterN = -1;
public Throwable failSwapInFailure = null; public Throwable failSwapInFailure = null;
private int failSwapOutAfterN = -1;
private IOException failSwapOutFailure = null;
public void setSwapInFailure(final Throwable t) { public void setSwapInFailure(final Throwable t) {
this.failSwapInFailure = t; this.failSwapInFailure = t;
} }
public void setSwapOutFailureOnNthIteration(final int n) {
setSwapOutFailureOnNthIteration(n, null);
}
public void setSwapOutFailureOnNthIteration(final int n, final IOException failureException) {
this.failSwapOutAfterN = n;
this.failSwapOutFailure = failureException;
}
@Override @Override
public void initialize(final SwapManagerInitializationContext initializationContext) { public void initialize(final SwapManagerInitializationContext initializationContext) {
@ -65,6 +77,12 @@ public class MockSwapManager implements FlowFileSwapManager {
@Override @Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException { public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
swapOutCalledCount++; swapOutCalledCount++;
if (failSwapOutAfterN > -1 && swapOutCalledCount >= failSwapOutAfterN) {
final IOException ioe = failSwapOutFailure == null ? new IOException("Intentional Unit Test IOException on swap out call number " + swapOutCalledCount) : failSwapOutFailure;
throw ioe;
}
final String location = UUID.randomUUID().toString() + "." + partitionName; final String location = UUID.randomUUID().toString() + "." + partitionName;
swappedOut.put(location, new ArrayList<>(flowFiles)); swappedOut.put(location, new ArrayList<>(flowFiles));
return location; return location;

View File

@ -16,18 +16,6 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
@ -36,9 +24,30 @@ import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
public class TestFileSystemSwapManager { public class TestFileSystemSwapManager {
@Test @Test
@ -48,7 +57,7 @@ public class TestFileSystemSwapManager {
final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) { final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FileSystemSwapManager swapManager = createSwapManager(); final FileSystemSwapManager swapManager = createSwapManager();
final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue); final SwapContents swapContents = swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
@ -63,11 +72,49 @@ public class TestFileSystemSwapManager {
} }
} }
@Test
public void testFailureOnRepoSwapOut() throws IOException {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
private FileSystemSwapManager createSwapManager() { final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
final FileSystemSwapManager swapManager = new FileSystemSwapManager(); Mockito.doThrow(new IOException("Intentional IOException for unit test"))
.when(flowFileRepo).updateRepository(anyCollection());
final FileSystemSwapManager swapManager = createSwapManager();
final List<FlowFileRecord> flowFileRecords = new ArrayList<>();
for (int i=0; i < 10000; i++) {
flowFileRecords.add(new MockFlowFileRecord(i));
}
try {
swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1");
Assert.fail("Expected IOException");
} catch (final IOException ioe) {
// expected
}
}
@Test
public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("");
final File targetDir = new File("target/swap");
targetDir.mkdirs();
final File targetFile = new File(targetDir, "444-old-swap-file.swap");
final File originalSwapFile = new File("src/test/resources/swap/444-old-swap-file.swap");
try (final OutputStream fos = new FileOutputStream(targetFile);
final InputStream fis = new FileInputStream(originalSwapFile)) {
StreamUtils.copy(fis, fos);
}
final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target"));
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager(); final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
final FlowFileRepository flowfileRepo = Mockito.mock(FlowFileRepository.class); final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
swapManager.initialize(new SwapManagerInitializationContext() { swapManager.initialize(new SwapManagerInitializationContext() {
@Override @Override
public ResourceClaimManager getResourceClaimManager() { public ResourceClaimManager getResourceClaimManager() {
@ -76,7 +123,46 @@ public class TestFileSystemSwapManager {
@Override @Override
public FlowFileRepository getFlowFileRepository() { public FlowFileRepository getFlowFileRepository() {
return flowfileRepo; return flowFileRepo;
}
@Override
public EventReporter getEventReporter() {
return EventReporter.NO_OP;
}
});
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(false);
final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null);
assertEquals(1, recoveredLocations.size());
final String firstLocation = recoveredLocations.get(0);
final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue);
assertEquals(0, emptyContents.getFlowFiles().size());
when(flowFileRepo.isValidSwapLocationSuffix(anyString())).thenReturn(true);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final SwapContents contents = swapManager.swapIn(firstLocation, flowFileQueue);
assertEquals(10000, contents.getFlowFiles().size());
}
private FileSystemSwapManager createSwapManager() {
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
return createSwapManager(flowFileRepo);
}
private FileSystemSwapManager createSwapManager(final FlowFileRepository flowFileRepo) {
final FileSystemSwapManager swapManager = new FileSystemSwapManager();
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public FlowFileRepository getFlowFileRepository() {
return flowFileRepo;
} }
@Override @Override

View File

@ -17,21 +17,6 @@
package org.apache.nifi.controller.queue.clustered; package org.apache.nifi.controller.queue.clustered;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.controller.MockFlowFileRecord; import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager; import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.queue.DropFlowFileAction; import org.apache.nifi.controller.queue.DropFlowFileAction;
@ -42,16 +27,34 @@ import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestSwappablePriorityQueue { public class TestSwappablePriorityQueue {
private MockSwapManager swapManager; private MockSwapManager swapManager;
private final EventReporter eventReporter = EventReporter.NO_OP; private final List<String> events = new ArrayList<>();
private EventReporter eventReporter;
private final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); private final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
private final DropFlowFileAction dropAction = (flowFiles, requestor) -> { private final DropFlowFileAction dropAction = (flowFiles, requestor) -> {
return new QueueSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); return new QueueSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
@ -63,11 +66,35 @@ public class TestSwappablePriorityQueue {
public void setup() { public void setup() {
swapManager = new MockSwapManager(); swapManager = new MockSwapManager();
events.clear();
eventReporter = new EventReporter() {
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
events.add(message);
}
};
when(flowFileQueue.getIdentifier()).thenReturn("unit-test"); when(flowFileQueue.getIdentifier()).thenReturn("unit-test");
queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local"); queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local");
} }
@Test
public void testSwapOutFailureLeavesCorrectQueueSize() {
swapManager.setSwapOutFailureOnNthIteration(1, null);
for (int i = 0; i < 19999; i++) {
queue.put(new MockFlowFile(i));
}
assertEquals(19999, queue.size().getObjectCount());
assertEquals(0, events.size());
queue.put(new MockFlowFile(20000));
assertEquals(20000, queue.size().getObjectCount());
assertEquals(1, events.size()); // Expect a single failure event to be emitted
}
@Test @Test
public void testPrioritizer() { public void testPrioritizer() {
final FlowFilePrioritizer prioritizer = (o1, o2) -> Long.compare(o1.getId(), o2.getId()); final FlowFilePrioritizer prioritizer = (o1, o2) -> Long.compare(o1.getId(), o2.getId());

View File

@ -277,14 +277,13 @@ public class TestStandardProcessSession {
connList.add(conn1); connList.add(conn1);
connList.add(conn2); connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
.id(1000L) .id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis());
.build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecordBuilder.build());
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecordBuilder.id(1001).build());
when(connectable.getIncomingConnections()).thenReturn(connList); when(connectable.getIncomingConnections()).thenReturn(connList);
@ -295,6 +294,36 @@ public class TestStandardProcessSession {
verify(conn2, times(1)).poll(any(Set.class)); verify(conn2, times(1)).poll(any(Set.class));
} }
@Test
public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID
for (int i=0; i < 2; i++) {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.size(0L)
.build();
flowFileQueue.put(flowFileRecord);
}
final Relationship relationship = new Relationship.Builder().name("A").build();
FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(ff1, relationship);
try {
session.get();
Assert.fail("Should not have been able to poll second FlowFile with same ID");
} catch (final FlowFileAccessException e) {
// Expected
}
}
@Test @Test
public void testCloneOriginalDataSmaller() throws IOException { public void testCloneOriginalDataSmaller() throws IOException {
final byte[] originalContent = "hello".getBytes(); final byte[] originalContent = "hello".getBytes();
@ -416,14 +445,14 @@ public class TestStandardProcessSession {
connList.add(conn1); connList.add(conn1);
connList.add(conn2); connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
final StandardFlowFileRecord.Builder flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L) .id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis());
.build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord.build());
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord.id(1001).build());
when(connectable.getIncomingConnections()).thenReturn(connList); when(connectable.getIncomingConnections()).thenReturn(connList);
@ -475,14 +504,13 @@ public class TestStandardProcessSession {
connList.add(conn1); connList.add(conn1);
connList.add(conn2); connList.add(conn2);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder()
.id(1000L) .id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis());
.build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecordBuilder.build());
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecordBuilder.id(10001L).build());
when(connectable.getIncomingConnections()).thenReturn(connList); when(connectable.getIncomingConnections()).thenReturn(connList);
@ -1383,10 +1411,11 @@ public class TestStandardProcessSession {
@Test @Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() { public void testContentNotFoundExceptionThrownWhenUnableToReadDataStreamCallbackOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .id(1)
.entryDate(System.currentTimeMillis()) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get(); FlowFile ff1 = session.get();
@ -1399,12 +1428,13 @@ public class TestStandardProcessSession {
session.commit(); session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .id(2)
.entryDate(System.currentTimeMillis()) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L) .contentClaimOffset(1000L)
.size(1000L) .size(1000L)
.build(); .build();
flowFileQueue.put(flowFileRecord2); flowFileQueue.put(flowFileRecord2);
// attempt to read the data. // attempt to read the data.
@ -1424,10 +1454,11 @@ public class TestStandardProcessSession {
@Test @Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .id(1)
.entryDate(System.currentTimeMillis()) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -1441,10 +1472,11 @@ public class TestStandardProcessSession {
session.commit(); session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .id(2)
.entryDate(System.currentTimeMillis()) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(1000L).size(1L).build(); .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2); flowFileQueue.put(flowFileRecord2);
// attempt to read the data. // attempt to read the data.
@ -1544,11 +1576,15 @@ public class TestStandardProcessSession {
@Test @Test
public void testRollbackAfterCheckpoint() { public void testRollbackAfterCheckpoint() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final StandardFlowFileRecord.Builder recordBuilder = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .id(1)
.entryDate(System.currentTimeMillis()) .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L))
.contentClaimOffset(0L).size(0L).build(); .contentClaimOffset(0L)
.size(0L);
final FlowFileRecord flowFileRecord = recordBuilder.build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
final FlowFile originalFlowFile = session.get(); final FlowFile originalFlowFile = session.get();
@ -1574,7 +1610,7 @@ public class TestStandardProcessSession {
session.rollback(); session.rollback();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(recordBuilder.id(2).build());
assertFalse(flowFileQueue.isActiveQueueEmpty()); assertFalse(flowFileQueue.isActiveQueueEmpty());
final FlowFile originalRound2 = session.get(); final FlowFile originalRound2 = session.get();
@ -1596,8 +1632,8 @@ public class TestStandardProcessSession {
session.commit(); session.commit();
// FlowFile transferred back to queue // FlowFiles transferred back to queue
assertEquals(1, flowFileQueue.size().getObjectCount()); assertEquals(2, flowFileQueue.size().getObjectCount());
assertFalse(flowFileQueue.isUnacknowledgedFlowFile()); assertFalse(flowFileQueue.isUnacknowledgedFlowFile());
assertFalse(flowFileQueue.isActiveQueueEmpty()); assertFalse(flowFileQueue.isActiveQueueEmpty());
} }
@ -2116,6 +2152,11 @@ public class TestStandardProcessSession {
@Override @Override
public void initialize(ResourceClaimManager claimManager) throws IOException { public void initialize(ResourceClaimManager claimManager) throws IOException {
} }
@Override
public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
return false;
}
} }
private static class MockContentRepository implements ContentRepository { private static class MockContentRepository implements ContentRepository {

View File

@ -70,6 +70,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -396,6 +397,101 @@ public class TestWriteAheadFlowFileRepository {
} }
@Test
public void testGetLocationSuffix() {
assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
assertEquals(null, WriteAheadFlowFileRepository.getLocationSuffix(null));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
}
@Test
public void testSwapLocationsRestored() throws IOException {
final Path path = Paths.get("target/test-swap-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
}
final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
repo.initialize(new StandardResourceClaimManager());
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "swap123");
record.setDestination(queue);
records.add(record);
repo.updateRepository(records);
repo.close();
// restore
final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider, 0L);
assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
assertFalse(repo2.isValidSwapLocationSuffix("other"));
repo2.close();
}
@Test
public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
final Path path = Paths.get("target/test-swap-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
}
final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
repo.initialize(new StandardResourceClaimManager());
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
when(queue.getIdentifier()).thenReturn("1234");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
ffBuilder.size(0L);
final FlowFileRecord flowFileRecord = ffBuilder.build();
final List<RepositoryRecord> records = new ArrayList<>();
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord, "/tmp/swap123");
record.setDestination(queue);
records.add(record);
assertFalse(repo.isValidSwapLocationSuffix("swap123"));
repo.updateRepository(records);
assertTrue(repo.isValidSwapLocationSuffix("swap123"));
repo.close();
}
@Test @Test
public void testResourceClaimsIncremented() throws IOException { public void testResourceClaimsIncremented() throws IOException {