NIFI-1527: Ensure that we increment Claimant Counts for content claims that are referenced by Swapped-Out FlowFiles on restart of nifi

This commit is contained in:
Mark Payne 2016-02-18 11:14:35 -05:00
parent 1c73d9090a
commit 3bb18b9653
10 changed files with 418 additions and 99 deletions

View File

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
@ -40,16 +41,14 @@ public interface FlowFileQueue {
List<FlowFilePrioritizer> getPriorities();
/**
* Reads any Swap Files that belong to this queue and increments counts so that the size
* of the queue will reflect the size of all FlowFiles regardless of whether or not they are
* swapped out. This will be called only during NiFi startup as an initialization step. This
* method is then responsible for returning the largest ID of any FlowFile that is swapped
* Reads any Swap Files that belong to this queue and returns a summary of what is swapped out.
* This will be called only during NiFi startup as an initialization step. This
* method is then responsible for returning a FlowFileSummary of the FlowFiles that are swapped
* out, or <code>null</code> if no FlowFiles are swapped out for this queue.
*
* @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
* no FlowFiles are swapped out for this queue.
* @return a SwapSummary that describes the FlowFiles that exist in the queue but are swapped out.
*/
Long recoverSwappedFlowFiles();
SwapSummary recoverSwappedFlowFiles();
/**
* Destroys any Swap Files that exist for this queue without updating the FlowFile Repository

View File

@ -22,9 +22,9 @@ import java.text.NumberFormat;
*
*/
public class QueueSize {
private final int objectCount;
private final long totalSizeBytes;
private final int hashCode;
public QueueSize(final int numberObjects, final long totalSizeBytes) {
if (numberObjects < 0 || totalSizeBytes < 0) {
@ -32,6 +32,7 @@ public class QueueSize {
}
objectCount = numberObjects;
this.totalSizeBytes = totalSizeBytes;
hashCode = (int) (41 + 47 * objectCount + 51 * totalSizeBytes);
}
/**
@ -66,4 +67,27 @@ public class QueueSize {
public String toString() {
return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof QueueSize)) {
return false;
}
final QueueSize other = (QueueSize) obj;
return getObjectCount() == other.getObjectCount() && getByteCount() == other.getByteCount();
}
}

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
/**
* Defines a mechanism by which FlowFiles can be move into external storage or
@ -90,20 +89,14 @@ public interface FlowFileSwapManager {
List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
/**
* Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
* Parses the contents of the swap file at the given location and provides a SwapSummary that provides
* pertinent information about the information stored within the swap file
*
* @param swapLocation the location of the swap file
* @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
* @return a SwapSummary that provides information about what is contained within the swap file
* @throws IOException if unable to read or parse the swap file
*/
QueueSize getSwapSize(String swapLocation) throws IOException;
/**
* Returns the maximum record id of the FlowFiles stored at the given swap location
*
* @param swapLocation the swap location to read id's from
* @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
*/
Long getMaxRecordId(String swapLocation) throws IOException;
SwapSummary getSwapSummary(String swapLocation) throws IOException;
/**
* Purge all known Swap Files without updating FlowFileRepository or Provenance Repository

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.util.List;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
/**
* <p>
* Provides a summary of the information that is stored in a FlowFile swap file.
* </p>
*/
public interface SwapSummary {
/**
* @return a QueueSize that represents the number of FlowFiles are in the swap file and their
* aggregate content size
*/
QueueSize getQueueSize();
/**
* @return the largest ID of any of the FlowFiles that are contained in the swap file
*/
Long getMaxFlowFileId();
/**
* Returns a List of all ResoruceClaims that are referenced by the FlowFiles in the swap file.
* This List may well contain the same ResourceClaim many times. This indicates that many FlowFiles
* reference the same ResourceClaim.
*
* @return a List of all ResourceClaims that are referenced by the FlowFiles in the swap file
*/
List<ResourceClaim> getResourceClaims();
}

View File

@ -47,10 +47,12 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
@ -237,8 +239,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
return swapLocations;
}
@SuppressWarnings("deprecation")
@Override
public QueueSize getSwapSize(final String swapLocation) throws IOException {
public SwapSummary getSwapSummary(final String swapLocation) throws IOException {
final File swapFile = new File(swapLocation);
// read record from disk via the swap file
@ -259,56 +262,36 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final int numRecords = in.readInt();
final long contentSize = in.readLong();
return new QueueSize(numRecords, contentSize);
}
}
@Override
public Long getMaxRecordId(final String swapLocation) throws IOException {
final File swapFile = new File(swapLocation);
// read record from disk via the swap file
try (final InputStream fis = new FileInputStream(swapFile);
final InputStream bufferedIn = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bufferedIn)) {
final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
throw new IOException(errMsg);
}
in.readUTF(); // ignore connection id
final int numRecords = in.readInt();
in.readLong(); // ignore content size
if (numRecords == 0) {
return null;
return StandardSwapSummary.EMPTY_SUMMARY;
}
Long maxRecordId = null;
if (swapEncodingVersion > 7) {
final long maxRecordId = in.readLong();
return maxRecordId;
maxRecordId = in.readLong();
}
// Before swap encoding version 8, we did not write out the max record id, so we have to read all
// swap files to determine the max record id
final List<ResourceClaim> resourceClaims = new ArrayList<>(numRecords);
final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, true, claimManager);
long maxId = 0L;
for (final FlowFileRecord record : records) {
if (record.getId() > maxId) {
maxId = record.getId();
if (maxRecordId == null || record.getId() > maxRecordId) {
maxRecordId = record.getId();
}
final ContentClaim contentClaim = record.getContentClaim();
if (contentClaim != null) {
resourceClaims.add(contentClaim.getResourceClaim());
}
}
return maxId;
return new StandardSwapSummary(new QueueSize(numRecords, contentSize), maxRecordId, resourceClaims);
}
}
@SuppressWarnings("deprecation")
public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
if (toSwap == null || toSwap.isEmpty()) {
return 0;
@ -636,5 +619,4 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
}
}

View File

@ -73,6 +73,7 @@ import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
@ -591,9 +592,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else {
for (final Connection connection : connections) {
final FlowFileQueue queue = connection.getFlowFileQueue();
final Long maxFlowFileId = queue.recoverSwappedFlowFiles();
if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
maxIdFromSwapFiles = maxFlowFileId;
final SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
if (swapSummary != null) {
final Long maxFlowFileId = swapSummary.getMaxFlowFileId();
if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
maxIdFromSwapFiles = maxFlowFileId;
}
for (final ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
resourceClaimManager.incrementClaimantCount(resourceClaim);
}
}
}
}

View File

@ -49,9 +49,11 @@ import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
@ -787,10 +789,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
public Long recoverSwappedFlowFiles() {
public SwapSummary recoverSwappedFlowFiles() {
int swapFlowFileCount = 0;
long swapByteCount = 0L;
Long maxId = null;
List<ResourceClaim> resourceClaims = new ArrayList<>();
writeLock.lock();
try {
@ -809,8 +812,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
for (final String swapLocation : swapLocations) {
try {
final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
final QueueSize queueSize = summary.getQueueSize();
final Long maxSwapRecordId = summary.getMaxFlowFileId();
if (maxSwapRecordId != null) {
if (maxId == null || maxSwapRecordId > maxId) {
maxId = maxSwapRecordId;
@ -819,6 +823,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swapFlowFileCount += queueSize.getObjectCount();
swapByteCount += queueSize.getByteCount();
resourceClaims.addAll(summary.getResourceClaims());
} catch (final IOException ioe) {
logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
logger.error("", ioe);
@ -835,7 +840,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("Recover Swap Files");
}
return maxId;
return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.swap;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
public class StandardSwapSummary implements SwapSummary {
public static final SwapSummary EMPTY_SUMMARY = new StandardSwapSummary(new QueueSize(0, 0L), null, Collections.<ResourceClaim> emptyList());
private final QueueSize queueSize;
private final Long maxFlowFileId;
private final List<ResourceClaim> resourceClaims;
public StandardSwapSummary(final QueueSize queueSize, final Long maxFlowFileId, final List<ResourceClaim> resourceClaims) {
this.queueSize = queueSize;
this.maxFlowFileId = maxFlowFileId;
this.resourceClaims = Collections.unmodifiableList(resourceClaims);
}
@Override
public QueueSize getQueueSize() {
return queueSize;
}
@Override
public Long getMaxFlowFileId() {
return maxFlowFileId;
}
@Override
public List<ResourceClaim> getResourceClaims() {
return resourceClaims;
}
}

View File

@ -46,8 +46,11 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -472,37 +475,30 @@ public class TestStandardFlowFileQueue {
}
@Override
public QueueSize getSwapSize(String swapLocation) throws IOException {
@SuppressWarnings("deprecation")
public SwapSummary getSwapSummary(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
return new QueueSize(0, 0L);
return StandardSwapSummary.EMPTY_SUMMARY;
}
int count = 0;
long size = 0L;
Long max = null;
final List<ResourceClaim> resourceClaims = new ArrayList<>();
for (final FlowFileRecord flowFile : flowFiles) {
count++;
size += flowFile.getSize();
}
return new QueueSize(count, size);
}
@Override
public Long getMaxRecordId(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
return null;
}
Long max = null;
for (final FlowFileRecord flowFile : flowFiles) {
if (max == null || flowFile.getId() > max) {
max = flowFile.getId();
}
if (flowFile.getContentClaim() != null) {
resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
}
}
return max;
return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
}
@Override
@ -588,6 +584,7 @@ public class TestStandardFlowFileQueue {
}
@Override
@SuppressWarnings("deprecation")
public int compareTo(final FlowFile o) {
return Long.compare(id, o.getId());
}

View File

@ -16,27 +16,40 @@
*/
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -44,9 +57,106 @@ import org.mockito.stubbing.Answer;
public class TestWriteAheadFlowFileRepository {
@BeforeClass
public static void setupProperties() {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
}
@Before
public void clearRepo() throws IOException {
final File target = new File("target");
final File testRepo = new File(target, "test-repo");
if (testRepo.exists()) {
FileUtils.deleteFile(testRepo, true);
}
}
@Test
public void testResourceClaimsIncremented() throws IOException {
final ResourceClaimManager claimManager = new StandardResourceClaimManager();
final TestQueueProvider queueProvider = new TestQueueProvider();
final Connection connection = Mockito.mock(Connection.class);
when(connection.getIdentifier()).thenReturn("1234");
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null);
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
final ResourceClaim resourceClaim1 = new StandardResourceClaim("container", "section", "1", false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
final ResourceClaim resourceClaim2 = new StandardResourceClaim("container", "section", "2", false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,
// indicate that a FlowFile was swapped out. We should then be able to recover these FlowFiles and the
// resource claims' counts should be updated for both the swapped out FlowFile and the non-swapped out FlowFile
try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
repo.initialize(claimManager);
repo.loadFlowFiles(queueProvider, -1L);
// Create a Repository Record that indicates that a FlowFile was created
final FlowFileRecord flowFile1 = new StandardFlowFileRecord.Builder()
.id(1L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
.contentClaim(claim1)
.build();
final StandardRepositoryRecord rec1 = new StandardRepositoryRecord(queue);
rec1.setWorking(flowFile1);
rec1.setDestination(queue);
// Create a Record that we can swap out
final FlowFileRecord flowFile2 = new StandardFlowFileRecord.Builder()
.id(2L)
.addAttribute("uuid", "11111111-1111-1111-1111-111111111112")
.contentClaim(claim2)
.build();
final StandardRepositoryRecord rec2 = new StandardRepositoryRecord(queue);
rec2.setWorking(flowFile2);
rec2.setDestination(queue);
final List<RepositoryRecord> records = new ArrayList<>();
records.add(rec1);
records.add(rec2);
repo.updateRepository(records);
final String swapLocation = swapMgr.swapOut(Collections.singletonList(flowFile2), queue);
repo.swapFlowFilesOut(Collections.singletonList(flowFile2), queue, swapLocation);
}
final ResourceClaimManager recoveryClaimManager = new StandardResourceClaimManager();
try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository()) {
repo.initialize(recoveryClaimManager);
final long largestId = repo.loadFlowFiles(queueProvider, 0L);
// largest ID known is 1 because this doesn't take into account the FlowFiles that have been swapped out
assertEquals(1, largestId);
}
// resource claim 1 will have a single claimant count while resource claim 2 will have no claimant counts
// because resource claim 2 is referenced only by flowfiles that are swapped out.
assertEquals(1, recoveryClaimManager.getClaimantCount(resourceClaim1));
assertEquals(0, recoveryClaimManager.getClaimantCount(resourceClaim2));
final SwapSummary summary = queue.recoverSwappedFlowFiles();
assertNotNull(summary);
assertEquals(2, summary.getMaxFlowFileId().intValue());
assertEquals(new QueueSize(1, 0L), summary.getQueueSize());
final List<ResourceClaim> swappedOutClaims = summary.getResourceClaims();
assertNotNull(swappedOutClaims);
assertEquals(1, swappedOutClaims.size());
assertEquals(claim2.getResourceClaim(), swappedOutClaims.get(0));
}
@Test
public void testRestartWithOneRecord() throws IOException {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
final Path path = Paths.get("target/test-repo");
if (Files.exists(path)) {
FileUtils.deleteFile(path.toFile(), true);
@ -55,19 +165,7 @@ public class TestWriteAheadFlowFileRepository {
final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository();
repo.initialize(new StandardResourceClaimManager());
final List<Connection> connectionList = new ArrayList<>();
final QueueProvider queueProvider = new QueueProvider() {
@Override
public Collection<FlowFileQueue> getAllQueues() {
final List<FlowFileQueue> queueList = new ArrayList<>();
for (final Connection conn : connectionList) {
queueList.add(conn.getFlowFileQueue());
}
return queueList;
}
};
final TestQueueProvider queueProvider = new TestQueueProvider();
repo.loadFlowFiles(queueProvider, 0L);
final List<FlowFileRecord> flowFileCollection = new ArrayList<>();
@ -87,7 +185,7 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getFlowFileQueue()).thenReturn(queue);
connectionList.add(connection);
queueProvider.addConnection(connection);
StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(1L);
@ -132,4 +230,113 @@ public class TestWriteAheadFlowFileRepository {
repo2.close();
}
private static class TestQueueProvider implements QueueProvider {
private List<Connection> connectionList = new ArrayList<>();
public void addConnection(final Connection connection) {
this.connectionList.add(connection);
}
@Override
public Collection<FlowFileQueue> getAllQueues() {
final List<FlowFileQueue> queueList = new ArrayList<>();
for (final Connection conn : connectionList) {
queueList.add(conn.getFlowFileQueue());
}
return queueList;
}
}
private static class MockFlowFileSwapManager implements FlowFileSwapManager {
private final Map<FlowFileQueue, Map<String, List<FlowFileRecord>>> swappedRecords = new HashMap<>();
@Override
public void initialize(SwapManagerInitializationContext initializationContext) {
}
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
swapMap = new HashMap<>();
swappedRecords.put(flowFileQueue, swapMap);
}
final String location = UUID.randomUUID().toString();
swapMap.put(location, new ArrayList<>(flowFiles));
return location;
}
@Override
public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
return Collections.unmodifiableList(swapMap.get(swapLocation));
}
@Override
public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
return swapMap.remove(swapLocation);
}
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) {
return null;
}
return new ArrayList<>(swapMap.keySet());
}
@Override
@SuppressWarnings("deprecation")
public SwapSummary getSwapSummary(String swapLocation) throws IOException {
List<FlowFileRecord> records = null;
for (final Map<String, List<FlowFileRecord>> swapMap : swappedRecords.values()) {
records = swapMap.get(swapLocation);
if (records != null) {
break;
}
}
if (records == null) {
return null;
}
final List<ResourceClaim> resourceClaims = new ArrayList<>();
long size = 0L;
Long maxId = null;
for (final FlowFileRecord flowFile : records) {
size += flowFile.getSize();
if (maxId == null || flowFile.getId() > maxId) {
maxId = flowFile.getId();
}
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
resourceClaims.add(resourceClaim);
}
}
return new StandardSwapSummary(new QueueSize(records.size(), size), maxId, resourceClaims);
}
@Override
public void purge() {
this.swappedRecords.clear();
}
}
}