NIFI-15: Address issue where incomplete swap files can result in continually attempting to swap in data without ever being successful

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2016-02-21 14:03:37 -05:00 committed by Aldrin Piri
parent 7f15626af5
commit 2839a2f215
9 changed files with 467 additions and 168 deletions

View File

@ -51,17 +51,17 @@ public interface FlowFileSwapManager {
String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException; String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException;
/** /**
* Recovers the SwapFiles from the swap file that lives at the given location. This action * Recovers the FlowFiles from the swap file that lives at the given location. This action
* provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
* at the given location remains in that location and the FlowFile Repository is not updated. * at the given location remains in that location and the FlowFile Repository is not updated.
* *
* @param swapLocation the location of the swap file * @param swapLocation the location of the swap file
* @param flowFileQueue the queue that the FlowFiles belong to * @param flowFileQueue the queue that the FlowFiles belong to
* @return the FlowFiles that live at the given swap location * @return a SwapContents that includes the FlowFiles that live at the given swap location
* *
* @throws IOException if unable to recover the FlowFiles from the given location * @throws IOException if unable to recover the FlowFiles from the given location
*/ */
List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException;
/** /**
* Recovers the FlowFiles from the swap file that lives at the given location and belongs * Recovers the FlowFiles from the swap file that lives at the given location and belongs
@ -71,12 +71,12 @@ public interface FlowFileSwapManager {
* @param swapLocation the location of the swap file * @param swapLocation the location of the swap file
* @param flowFileQueue the queue to which the FlowFiles belong * @param flowFileQueue the queue to which the FlowFiles belong
* *
* @return the FlowFiles that are stored in the given location * @return a SwapContents that includes FlowFiles that are stored in the given location
* *
* @throws IOException if unable to recover the FlowFiles from the given location or update the * @throws IOException if unable to recover the FlowFiles from the given location or update the
* FlowFileRepository * FlowFileRepository
*/ */
List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException;
/** /**
* Determines swap files that exist for the given FlowFileQueue * Determines swap files that exist for the given FlowFileQueue

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.io.EOFException;
/**
* Signals that a Swap File could not be complete read in/parsed because the data was
* not all present
*/
public class IncompleteSwapFileException extends EOFException {
private static final long serialVersionUID = -6818558584430076898L;
private final String swapLocation;
private final SwapContents partialContents;
public IncompleteSwapFileException(final String swapLocation, final SwapContents partialContents) {
super();
this.swapLocation = swapLocation;
this.partialContents = partialContents;
}
public String getSwapLocation() {
return swapLocation;
}
public SwapContents getPartialContents() {
return partialContents;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.util.List;
/**
* When FlowFiles are constructed from a Swap File, there is information in addition to
* the FlowFiles themselves that get stored and recovered. SwapContents provides a
* mechanism to encapsulate all of the information contained within a Swap File in a
* single object
*/
public interface SwapContents {
/**
* @return a summary of information included in a Swap File
*/
SwapSummary getSummary();
/**
* @return the FlowFiles that are contained within a Swap File
*/
List<FlowFileRecord> getFlowFiles();
}

View File

@ -45,13 +45,16 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.StandardFlowFileRecord; import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
@ -130,33 +133,33 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
@Override @Override
public List<FlowFileRecord> swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
final File swapFile = new File(swapLocation); final File swapFile = new File(swapLocation);
final List<FlowFileRecord> swappedFlowFiles = peek(swapLocation, flowFileQueue); final SwapContents swapContents = peek(swapLocation, flowFileQueue);
flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue); flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue);
if (!swapFile.delete()) { if (!swapFile.delete()) {
warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
} }
return swappedFlowFiles; return swapContents;
} }
@Override @Override
public List<FlowFileRecord> peek(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { public SwapContents peek(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
final File swapFile = new File(swapLocation); final File swapFile = new File(swapLocation);
if (!swapFile.exists()) { if (!swapFile.exists()) {
throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found"); throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
} }
final List<FlowFileRecord> swappedFlowFiles; final SwapContents swapContents;
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream fis = new FileInputStream(swapFile);
final InputStream bis = new BufferedInputStream(fis); final InputStream bis = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bis)) { final DataInputStream in = new DataInputStream(bis)) {
swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager); swapContents = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
} }
return swappedFlowFiles; return swapContents;
} }
@ -240,7 +243,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
@Override @Override
@SuppressWarnings("deprecation")
public SwapSummary getSwapSummary(final String swapLocation) throws IOException { public SwapSummary getSwapSummary(final String swapLocation) throws IOException {
final File swapFile = new File(swapLocation); final File swapFile = new File(swapLocation);
@ -258,35 +260,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
throw new IOException(errMsg); throw new IOException(errMsg);
} }
in.readUTF(); // ignore Connection ID final int numRecords;
final int numRecords = in.readInt(); final long contentSize;
final long contentSize = in.readLong(); Long maxRecordId = null;
try {
in.readUTF(); // ignore Connection ID
numRecords = in.readInt();
contentSize = in.readLong();
if (numRecords == 0) { if (numRecords == 0) {
return StandardSwapSummary.EMPTY_SUMMARY;
}
if (swapEncodingVersion > 7) {
maxRecordId = in.readLong();
}
} catch (final EOFException eof) {
logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation);
return StandardSwapSummary.EMPTY_SUMMARY; return StandardSwapSummary.EMPTY_SUMMARY;
} }
Long maxRecordId = null; final QueueSize queueSize = new QueueSize(numRecords, contentSize);
if (swapEncodingVersion > 7) { final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation);
maxRecordId = in.readLong(); return swapContents.getSummary();
}
// Before swap encoding version 8, we did not write out the max record id, so we have to read all
// swap files to determine the max record id
final List<ResourceClaim> resourceClaims = new ArrayList<>(numRecords);
final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
for (final FlowFileRecord record : records) {
if (maxRecordId == null || record.getId() > maxRecordId) {
maxRecordId = record.getId();
}
final ContentClaim contentClaim = record.getContentClaim();
if (contentClaim != null) {
resourceClaims.add(contentClaim.getResourceClaim());
}
}
return new StandardSwapSummary(new QueueSize(numRecords, contentSize), maxRecordId, resourceClaims);
} }
} }
@ -385,7 +381,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { static SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final int swapEncodingVersion = in.readInt(); final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) { if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
@ -398,115 +394,146 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
" because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier()); " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
} }
final int numRecords = in.readInt(); int numRecords = 0;
in.readLong(); // Content Size long contentSize = 0L;
if (swapEncodingVersion > 7) { Long maxRecordId = null;
in.readLong(); // Max Record ID try {
numRecords = in.readInt();
contentSize = in.readLong(); // Content Size
if (swapEncodingVersion > 7) {
maxRecordId = in.readLong(); // Max Record ID
}
} catch (final EOFException eof) {
final QueueSize queueSize = new QueueSize(numRecords, contentSize);
final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections.<ResourceClaim> emptyList());
final SwapContents partialContents = new StandardSwapContents(summary, Collections.<FlowFileRecord> emptyList());
throw new IncompleteSwapFileException(swapLocation, partialContents);
} }
return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager); final QueueSize queueSize = new QueueSize(numRecords, contentSize);
return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, false, claimManager, swapLocation);
} }
private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId,
final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException { final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException {
final List<FlowFileRecord> flowFiles = new ArrayList<>(); final List<FlowFileRecord> flowFiles = new ArrayList<>(queueSize.getObjectCount());
for (int i = 0; i < numFlowFiles; i++) { final List<ResourceClaim> resourceClaims = new ArrayList<>(queueSize.getObjectCount());
// legacy encoding had an "action" because it used to be couple with FlowFile Repository code Long maxId = maxRecordId;
if (serializationVersion < 3) {
final int action = in.read(); for (int i = 0; i < queueSize.getObjectCount(); i++) {
if (action != 1) { try {
throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type"); // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
if (serializationVersion < 3) {
final int action = in.read();
if (action != 1) {
throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
}
} }
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
final long recordId = in.readLong();
if (maxId == null || recordId > maxId) {
maxId = recordId;
}
ffBuilder.id(recordId);
ffBuilder.entryDate(in.readLong());
if (serializationVersion > 1) {
// Lineage information was added in version 2
final int numLineageIdentifiers = in.readInt();
final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers);
for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
ffBuilder.lineageStartDate(in.readLong());
if (serializationVersion > 5) {
ffBuilder.lastQueueDate(in.readLong());
}
}
ffBuilder.size(in.readLong());
if (serializationVersion < 3) {
readString(in); // connection Id
}
final boolean hasClaim = in.readBoolean();
ResourceClaim resourceClaim = null;
if (hasClaim) {
final String claimId;
if (serializationVersion < 5) {
claimId = String.valueOf(in.readLong());
} else {
claimId = in.readUTF();
}
final String container = in.readUTF();
final String section = in.readUTF();
final long resourceOffset;
final long resourceLength;
if (serializationVersion < 6) {
resourceOffset = 0L;
resourceLength = -1L;
} else {
resourceOffset = in.readLong();
resourceLength = in.readLong();
}
final long claimOffset = in.readLong();
final boolean lossTolerant;
if (serializationVersion >= 4) {
lossTolerant = in.readBoolean();
} else {
lossTolerant = false;
}
resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
claim.setLength(resourceLength);
if (incrementContentClaims) {
claimManager.incrementClaimantCount(resourceClaim);
}
ffBuilder.contentClaim(claim);
ffBuilder.contentClaimOffset(claimOffset);
}
boolean attributesChanged = true;
if (serializationVersion < 3) {
attributesChanged = in.readBoolean();
}
if (attributesChanged) {
final int numAttributes = in.readInt();
for (int j = 0; j < numAttributes; j++) {
final String key = readString(in);
final String value = readString(in);
ffBuilder.addAttribute(key, value);
}
}
final FlowFileRecord record = ffBuilder.build();
if (resourceClaim != null) {
resourceClaims.add(resourceClaim);
}
flowFiles.add(record);
} catch (final EOFException eof) {
final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles);
throw new IncompleteSwapFileException(location, partialContents);
} }
final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
ffBuilder.id(in.readLong());
ffBuilder.entryDate(in.readLong());
if (serializationVersion > 1) {
// Lineage information was added in version 2
final int numLineageIdentifiers = in.readInt();
final Set<String> lineageIdentifiers = new HashSet<>(numLineageIdentifiers);
for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) {
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
ffBuilder.lineageStartDate(in.readLong());
if (serializationVersion > 5) {
ffBuilder.lastQueueDate(in.readLong());
}
}
ffBuilder.size(in.readLong());
if (serializationVersion < 3) {
readString(in); // connection Id
}
final boolean hasClaim = in.readBoolean();
if (hasClaim) {
final String claimId;
if (serializationVersion < 5) {
claimId = String.valueOf(in.readLong());
} else {
claimId = in.readUTF();
}
final String container = in.readUTF();
final String section = in.readUTF();
final long resourceOffset;
final long resourceLength;
if (serializationVersion < 6) {
resourceOffset = 0L;
resourceLength = -1L;
} else {
resourceOffset = in.readLong();
resourceLength = in.readLong();
}
final long claimOffset = in.readLong();
final boolean lossTolerant;
if (serializationVersion >= 4) {
lossTolerant = in.readBoolean();
} else {
lossTolerant = false;
}
final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
claim.setLength(resourceLength);
if (incrementContentClaims) {
claimManager.incrementClaimantCount(resourceClaim);
}
ffBuilder.contentClaim(claim);
ffBuilder.contentClaimOffset(claimOffset);
}
boolean attributesChanged = true;
if (serializationVersion < 3) {
attributesChanged = in.readBoolean();
}
if (attributesChanged) {
final int numAttributes = in.readInt();
for (int j = 0; j < numAttributes; j++) {
final String key = readString(in);
final String value = readString(in);
ffBuilder.addAttribute(key, value);
}
}
final FlowFileRecord record = ffBuilder.build();
flowFiles.add(record);
} }
return flowFiles; final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims);
return new StandardSwapContents(swapSummary, flowFiles);
} }
private static String readString(final InputStream in) throws IOException { private static String readString(final InputStream in) throws IOException {

View File

@ -47,8 +47,10 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.RepositoryRecord; import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType; import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
@ -456,16 +458,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// first. // first.
if (!swapLocations.isEmpty()) { if (!swapLocations.isEmpty()) {
final String swapLocation = swapLocations.remove(0); final String swapLocation = swapLocations.remove(0);
boolean partialContents = false;
SwapContents swapContents = null;
try { try {
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this); swapContents = swapManager.swapIn(swapLocation, this);
long swapSize = 0L; } catch (final IncompleteSwapFileException isfe) {
for (final FlowFileRecord flowFile : swappedIn) { logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation);
swapSize += flowFile.getSize(); logger.error("", isfe);
} swapContents = isfe.getPartialContents();
incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1); partialContents = true;
incrementActiveQueueSize(swappedIn.size(), swapSize);
activeQueue.addAll(swappedIn);
return;
} catch (final FileNotFoundException fnfe) { } catch (final FileNotFoundException fnfe) {
logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
if (eventReporter != null) { if (eventReporter != null) {
@ -481,6 +482,28 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
return; return;
} }
final QueueSize swapSize = swapContents.getSummary().getQueueSize();
final long contentSize = swapSize.getByteCount();
final int flowFileCount = swapSize.getObjectCount();
incrementSwapQueueSize(-flowFileCount, -contentSize, -1);
if (partialContents) {
// if we have partial results, we need to calculate the content size of the flowfiles
// actually swapped back in.
long contentSizeSwappedIn = 0L;
for (final FlowFileRecord swappedIn : swapContents.getFlowFiles()) {
contentSizeSwappedIn += swappedIn.getSize();
}
incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn);
} else {
// we swapped in the whole swap file. We can just use the info that we got from the summary.
incrementActiveQueueSize(flowFileCount, contentSize);
}
activeQueue.addAll(swapContents.getFlowFiles());
return;
} }
// this is the most common condition (nothing is swapped out), so do the check first and avoid the expense // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
@ -710,6 +733,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
@Override @Override
@SuppressWarnings("deprecation")
public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
int returnVal = 0; int returnVal = 0;
final boolean f1Penalized = f1.isPenalized(); final boolean f1Penalized = f1.isPenalized();
@ -1111,23 +1135,36 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
while (swapLocationItr.hasNext()) { while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next(); final String swapLocation = swapLocationItr.next();
List<FlowFileRecord> swappedIn = null; SwapContents swapContents = null;
try { try {
if (dropRequest.getState() == DropFlowFileState.CANCELED) { if (dropRequest.getState() == DropFlowFileState.CANCELED) {
logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
return; return;
} }
swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); swapContents = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
droppedSize = drop(swappedIn, requestor); droppedSize = drop(swapContents.getFlowFiles(), requestor);
} catch (final IncompleteSwapFileException isfe) {
swapContents = isfe.getPartialContents();
final String warnMsg = "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the file was corrupt. "
+ "Some FlowFiles may not be dropped from the queue until NiFi is restarted.";
logger.warn(warnMsg);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg);
}
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
logger.error("", ioe); logger.error("", ioe);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation
+ ". The FlowFiles contained in this Swap File will not be dropped from the queue");
}
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString()); dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
if (swappedIn != null) { if (swapContents != null) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose the FlowFiles from our queue.
} }
return; return;

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.swap;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
public class StandardSwapContents implements SwapContents {
public static final SwapContents EMPTY_SWAP_CONTENTS = new StandardSwapContents(StandardSwapSummary.EMPTY_SUMMARY, Collections.<FlowFileRecord> emptyList());
private final SwapSummary summary;
private final List<FlowFileRecord> flowFiles;
public StandardSwapContents(final SwapSummary summary, final List<FlowFileRecord> flowFiles) {
this.summary = summary;
this.flowFiles = flowFiles;
}
@Override
public SwapSummary getSummary() {
return summary;
}
@Override
public List<FlowFileRecord> getFlowFiles() {
return flowFiles;
}
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@ -57,7 +58,8 @@ public class TestFileSystemSwapManager {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager()); final SwapContents swapContents = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
final List<FlowFileRecord> records = swapContents.getFlowFiles();
assertEquals(10000, records.size()); assertEquals(10000, records.size());
for (final FlowFileRecord record : records) { for (final FlowFileRecord record : records) {
@ -68,6 +70,7 @@ public class TestFileSystemSwapManager {
} }
@Test @Test
@SuppressWarnings("deprecation")
public void testRoundTripSerializeDeserialize() throws IOException { public void testRoundTripSerializeDeserialize() throws IOException {
final List<FlowFileRecord> toSwap = new ArrayList<>(10000); final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
@ -88,16 +91,16 @@ public class TestFileSystemSwapManager {
FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
} }
final List<FlowFileRecord> swappedIn; final SwapContents swappedIn;
try (final FileInputStream fis = new FileInputStream(swapFile); try (final FileInputStream fis = new FileInputStream(swapFile);
final DataInputStream dis = new DataInputStream(fis)) { final DataInputStream dis = new DataInputStream(fis)) {
swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class)); swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
} }
assertEquals(toSwap.size(), swappedIn.size()); assertEquals(toSwap.size(), swappedIn.getFlowFiles().size());
for (int i = 0; i < toSwap.size(); i++) { for (int i = 0; i < toSwap.size(); i++) {
final FlowFileRecord pre = toSwap.get(i); final FlowFileRecord pre = toSwap.get(i);
final FlowFileRecord post = swappedIn.get(i); final FlowFileRecord post = swappedIn.getFlowFiles().get(i);
assertEquals(pre.getSize(), post.getSize()); assertEquals(pre.getSize(), post.getSize());
assertEquals(pre.getAttributes(), post.getAttributes()); assertEquals(pre.getAttributes(), post.getAttributes());

View File

@ -45,11 +45,14 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
@ -378,6 +381,68 @@ public class TestStandardFlowFileQueue {
queue.poll(exp); queue.poll(exp);
} }
@Test
public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
for (int i = 1; i <= 20000; i++) {
queue.put(new TestFlowFile());
}
assertEquals(20000, queue.size().getObjectCount());
assertEquals(20000, queue.size().getByteCount());
assertEquals(1, swapManager.swappedOut.size());
// when we swap in, cause an IncompleteSwapFileException to be
// thrown and contain only 9,999 of the 10,000 FlowFiles
swapManager.enableIncompleteSwapFileException(9999);
final Set<FlowFileRecord> expired = Collections.emptySet();
FlowFileRecord flowFile;
for (int i = 0; i < 10000; i++) {
flowFile = queue.poll(expired);
assertNotNull(flowFile);
queue.acknowledge(Collections.singleton(flowFile));
}
// 10,000 FlowFiles on queue - all swapped out
assertEquals(10000, queue.size().getObjectCount());
assertEquals(10000, queue.size().getByteCount());
assertEquals(1, swapManager.swappedOut.size());
assertEquals(0, swapManager.swapInCalledCount);
// Trigger swap in. This will remove 1 FlowFile from queue, leaving 9,999 but
// on swap in, we will get only 9,999 FlowFiles put onto the queue, and the queue size will
// be decremented by 10,000 (because the Swap File's header tells us that there are 10K
// FlowFiles, even though only 9999 are in the swap file)
flowFile = queue.poll(expired);
assertNotNull(flowFile);
queue.acknowledge(Collections.singleton(flowFile));
// size should be 9,998 because we lost 1 on Swap In, and then we pulled one above.
assertEquals(9998, queue.size().getObjectCount());
assertEquals(9998, queue.size().getByteCount());
assertEquals(0, swapManager.swappedOut.size());
assertEquals(1, swapManager.swapInCalledCount);
for (int i = 0; i < 9998; i++) {
flowFile = queue.poll(expired);
assertNotNull("Null FlowFile when i = " + i, flowFile);
queue.acknowledge(Collections.singleton(flowFile));
final QueueSize queueSize = queue.size();
assertEquals(9998 - i - 1, queueSize.getObjectCount());
assertEquals(9998 - i - 1, queueSize.getByteCount());
}
final QueueSize queueSize = queue.size();
assertEquals(0, queueSize.getObjectCount());
assertEquals(0L, queueSize.getByteCount());
flowFile = queue.poll(expired);
assertNull(flowFile);
}
@Test(timeout = 120000) @Test(timeout = 120000)
public void testDropSwappedFlowFiles() { public void testDropSwappedFlowFiles() {
for (int i = 1; i <= 210000; i++) { for (int i = 1; i <= 210000; i++) {
@ -445,11 +510,21 @@ public class TestStandardFlowFileQueue {
int swapOutCalledCount = 0; int swapOutCalledCount = 0;
int swapInCalledCount = 0; int swapInCalledCount = 0;
private int incompleteSwapFileRecordsToInclude = -1;
@Override @Override
public void initialize(final SwapManagerInitializationContext initializationContext) { public void initialize(final SwapManagerInitializationContext initializationContext) {
} }
public void enableIncompleteSwapFileException(final int flowFilesToInclude) {
incompleteSwapFileRecordsToInclude = flowFilesToInclude;
}
public void disableIncompleteSwapFileException() {
this.incompleteSwapFileRecordsToInclude = -1;
}
@Override @Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException { public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
swapOutCalledCount++; swapOutCalledCount++;
@ -458,15 +533,34 @@ public class TestStandardFlowFileQueue {
return location; return location;
} }
@Override private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException {
public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { if (incompleteSwapFileRecordsToInclude > -1) {
return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation)); final SwapSummary summary = getSwapSummary(swapLocation);
final List<FlowFileRecord> records;
if (remove) {
records = swappedOut.remove(swapLocation);
} else {
records = swappedOut.get(swapLocation);
}
final List<FlowFileRecord> partial = records.subList(0, incompleteSwapFileRecordsToInclude);
final SwapContents partialContents = new StandardSwapContents(summary, partial);
throw new IncompleteSwapFileException(swapLocation, partialContents);
}
} }
@Override @Override
public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
throwIncompleteIfNecessary(swapLocation, false);
return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation));
}
@Override
public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
swapInCalledCount++; swapInCalledCount++;
return swappedOut.remove(swapLocation); throwIncompleteIfNecessary(swapLocation, true);
return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation));
} }
@Override @Override

View File

@ -46,6 +46,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import org.junit.Before; import org.junit.Before;
@ -156,6 +157,7 @@ public class TestWriteAheadFlowFileRepository {
} }
@Test @Test
@SuppressWarnings("deprecation")
public void testRestartWithOneRecord() throws IOException { public void testRestartWithOneRecord() throws IOException {
final Path path = Paths.get("target/test-repo"); final Path path = Paths.get("target/test-repo");
if (Files.exists(path)) { if (Files.exists(path)) {
@ -269,23 +271,27 @@ public class TestWriteAheadFlowFileRepository {
} }
@Override @Override
public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue); Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) { if (swapMap == null) {
return null; return null;
} }
return Collections.unmodifiableList(swapMap.get(swapLocation)); final List<FlowFileRecord> flowFiles = swapMap.get(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
} }
@Override @Override
public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue); Map<String, List<FlowFileRecord>> swapMap = swappedRecords.get(flowFileQueue);
if (swapMap == null) { if (swapMap == null) {
return null; return null;
} }
return swapMap.remove(swapLocation); final List<FlowFileRecord> flowFiles = swapMap.remove(swapLocation);
final SwapSummary summary = getSwapSummary(swapLocation);
return new StandardSwapContents(summary, flowFiles);
} }
@Override @Override