NIFI-5997: Recover FlowFile Repository before swap files; then, when recovering swap files, ignore any that are unknown to the flowfile repo. This prevents us from incrementing the size of the flowfile queue for unknown swap files

This closes #3292.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-02-05 11:24:42 -05:00 committed by Bryan Bende
parent 83ac191736
commit 412c4908e2
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
7 changed files with 62 additions and 16 deletions

View File

@ -78,13 +78,11 @@ public interface FlowFileRepository extends Closeable {
*
* @param queueProvider the provider of FlowFile Queues into which the
* FlowFiles should be enqueued
* @param minimumSequenceNumber specifies the minimum value that should be
* returned by a call to {@link #getNextFlowFileSequence()}
*
* @return index of highest flow file identifier
* @throws IOException if load fails
*/
long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException;
long loadFlowFiles(QueueProvider queueProvider) throws IOException;
/**
* @return <code>true</code> if the Repository is volatile (i.e., its data
@ -104,6 +102,13 @@ public interface FlowFileRepository extends Closeable {
*/
long getMaxFlowFileIdentifier() throws IOException;
/**
* Notifies the FlowFile Repository that the given identifier has been identified as the maximum value that
* has been encountered for an 'external' (swapped out) FlowFile.
* @param maxId the max id of any FlowFile encountered
*/
void updateMaxFlowFileIdentifier(long maxId);
/**
* Updates the Repository to indicate that the given FlowFileRecords were
* Swapped Out of memory

View File

@ -298,6 +298,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
final boolean validLocation = flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
if (!validLocation) {
logger.warn("Encountered unknown Swap File {}; will ignore this Swap File. This file should be cleaned up manually", swapFile);
continue;
}
swapLocations.add(swapFile.getAbsolutePath());
continue;
}

View File

@ -748,6 +748,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// get all connections/queues and recover from swap files.
final List<Connection> connections = flowManager.getRootGroup().findAllConnections();
flowFileRepository.loadFlowFiles(new StandardQueueProvider(this));
long maxIdFromSwapFiles = -1L;
if (flowFileRepository.isVolatile()) {
for (final Connection connection : connections) {
@ -771,7 +773,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}
flowFileRepository.loadFlowFiles(new StandardQueueProvider(this), maxIdFromSwapFiles + 1);
flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1);
// Begin expiring FlowFiles that are old
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,

View File

@ -114,11 +114,25 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
}
@Override
public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException {
idGenerator.set(minimumSequenceNumber);
public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
return 0;
}
@Override
public void updateMaxFlowFileIdentifier(final long maxId) {
while (true) {
final long currentId = idGenerator.get();
if (currentId >= maxId) {
return;
}
final boolean updated = idGenerator.compareAndSet(currentId, maxId);
if (updated) {
return;
}
}
}
@Override
public long getNextFlowFileSequence() {
return idGenerator.getAndIncrement();

View File

@ -585,7 +585,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
@Override
public long loadFlowFiles(final QueueProvider queueProvider, final long minimumSequenceNumber) throws IOException {
public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
final Map<String, FlowFileQueue> queueMap = new HashMap<>();
for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
queueMap.put(queue.getIdentifier(), queue);
@ -630,7 +630,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// Determine the next sequence number for FlowFiles
int numFlowFilesMissingQueue = 0;
long maxId = minimumSequenceNumber;
long maxId = 0;
for (final RepositoryRecord record : recordList) {
final long recordId = serdeFactory.getRecordIdentifier(record);
if (recordId > maxId) {
@ -675,6 +675,21 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
return maxId;
}
@Override
public void updateMaxFlowFileIdentifier(final long maxId) {
while (true) {
final long currentId = flowFileSequenceGenerator.get();
if (currentId >= maxId) {
return;
}
final boolean updated = flowFileSequenceGenerator.compareAndSet(currentId, maxId);
if (updated) {
return;
}
}
}
@Override
public long getNextFlowFileSequence() {
return flowFileSequenceGenerator.getAndIncrement();

View File

@ -2104,6 +2104,10 @@ public class TestStandardProcessSession {
return 0L;
}
@Override
public void updateMaxFlowFileIdentifier(final long maxId) {
}
@Override
public void updateRepository(Collection<RepositoryRecord> records) throws IOException {
if (failOnUpdate) {
@ -2137,7 +2141,7 @@ public class TestStandardProcessSession {
}
@Override
public long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException {
public long loadFlowFiles(QueueProvider queueProvider) throws IOException {
return 0;
}

View File

@ -422,7 +422,7 @@ public class TestWriteAheadFlowFileRepository {
repo.initialize(new StandardResourceClaimManager());
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
@ -449,7 +449,7 @@ public class TestWriteAheadFlowFileRepository {
// restore
final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider, 0L);
repo2.loadFlowFiles(queueProvider);
assertTrue(repo2.isValidSwapLocationSuffix("swap123"));
assertFalse(repo2.isValidSwapLocationSuffix("other"));
repo2.close();
@ -466,7 +466,7 @@ public class TestWriteAheadFlowFileRepository {
repo.initialize(new StandardResourceClaimManager());
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
repo.loadFlowFiles(queueProvider);
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
@ -519,7 +519,7 @@ public class TestWriteAheadFlowFileRepository {
// resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) {
repo.initialize(claimManager);
repo.loadFlowFiles(queueProvider, -1L);
repo.loadFlowFiles(queueProvider);
// Create a Repository Record that indicates that a FlowFile was created
final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
@ -554,7 +554,7 @@ public class TestWriteAheadFlowFileRepository {
final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null))) {
repo.initialize(recoveryClaimManager);
final long largestId = repo.loadFlowFiles(queueProvider, 0L);
final long largestId = repo.loadFlowFiles(queueProvider);
// largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
assertEquals(1, largestId);
@ -587,7 +587,7 @@ public class TestWriteAheadFlowFileRepository {
repo.initialize(new StandardResourceClaimManager());
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
repo.loadFlowFiles(queueProvider);
final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
@ -639,7 +639,7 @@ public class TestWriteAheadFlowFileRepository {
// restore
final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(NiFiProperties.createBasicNiFiProperties(null, null));
repo2.initialize(new StandardResourceClaimManager());
repo2.loadFlowFiles(queueProvider, 0L);
repo2.loadFlowFiles(queueProvider);
assertEquals(1, flowFileCollection.size());
final FlowFileRecord flowFile = flowFileCollection.get(0);