diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index e07cf1a84c..7092a6fe26 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -51,17 +51,17 @@ public interface FlowFileSwapManager { String swapOut(List flowFiles, FlowFileQueue flowFileQueue) throws IOException; /** - * Recovers the SwapFiles from the swap file that lives at the given location. This action + * Recovers the FlowFiles from the swap file that lives at the given location. This action * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file * at the given location remains in that location and the FlowFile Repository is not updated. * * @param swapLocation the location of the swap file * @param flowFileQueue the queue that the FlowFiles belong to - * @return the FlowFiles that live at the given swap location + * @return a SwapContents that includes the FlowFiles that live at the given swap location * * @throws IOException if unable to recover the FlowFiles from the given location */ - List peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; + SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException; /** * Recovers the FlowFiles from the swap file that lives at the given location and belongs @@ -71,12 +71,12 @@ public interface FlowFileSwapManager { * @param swapLocation the location of the swap file * @param flowFileQueue the queue to which the FlowFiles belong * - * @return the FlowFiles that are stored in the given location + * @return a SwapContents that includes FlowFiles that are stored in the given location * * @throws IOException if unable to recover the FlowFiles from the given location or update the * FlowFileRepository */ - List swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException; + SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IncompleteSwapFileException, IOException; /** * Determines swap files that exist for the given FlowFileQueue diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java new file mode 100644 index 0000000000..4408f02bc7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/IncompleteSwapFileException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.EOFException; + +/** + * Signals that a Swap File could not be complete read in/parsed because the data was + * not all present + */ +public class IncompleteSwapFileException extends EOFException { + private static final long serialVersionUID = -6818558584430076898L; + + private final String swapLocation; + private final SwapContents partialContents; + + public IncompleteSwapFileException(final String swapLocation, final SwapContents partialContents) { + super(); + this.swapLocation = swapLocation; + this.partialContents = partialContents; + } + + public String getSwapLocation() { + return swapLocation; + } + + public SwapContents getPartialContents() { + return partialContents; + } +} diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java new file mode 100644 index 0000000000..2fb7ba4555 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapContents.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.util.List; + +/** + * When FlowFiles are constructed from a Swap File, there is information in addition to + * the FlowFiles themselves that get stored and recovered. SwapContents provides a + * mechanism to encapsulate all of the information contained within a Swap File in a + * single object + */ +public interface SwapContents { + + /** + * @return a summary of information included in a Swap File + */ + SwapSummary getSummary(); + + /** + * @return the FlowFiles that are contained within a Swap File + */ + List getFlowFiles(); + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 00b52cc6b2..156389bbe8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -45,13 +45,16 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.IncompleteSwapFileException; import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; @@ -130,33 +133,33 @@ public class FileSystemSwapManager implements FlowFileSwapManager { @Override - public List swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { + public SwapContents swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { final File swapFile = new File(swapLocation); - final List swappedFlowFiles = peek(swapLocation, flowFileQueue); - flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue); + final SwapContents swapContents = peek(swapLocation, flowFileQueue); + flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue); if (!swapFile.delete()) { warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); } - return swappedFlowFiles; + return swapContents; } @Override - public List peek(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { + public SwapContents peek(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { final File swapFile = new File(swapLocation); if (!swapFile.exists()) { throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found"); } - final List swappedFlowFiles; + final SwapContents swapContents; try (final InputStream fis = new FileInputStream(swapFile); final InputStream bis = new BufferedInputStream(fis); final DataInputStream in = new DataInputStream(bis)) { - swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager); + swapContents = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager); } - return swappedFlowFiles; + return swapContents; } @@ -240,7 +243,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } @Override - @SuppressWarnings("deprecation") public SwapSummary getSwapSummary(final String swapLocation) throws IOException { final File swapFile = new File(swapLocation); @@ -258,35 +260,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager { throw new IOException(errMsg); } - in.readUTF(); // ignore Connection ID - final int numRecords = in.readInt(); - final long contentSize = in.readLong(); + final int numRecords; + final long contentSize; + Long maxRecordId = null; + try { + in.readUTF(); // ignore Connection ID + numRecords = in.readInt(); + contentSize = in.readLong(); - if (numRecords == 0) { + if (numRecords == 0) { + return StandardSwapSummary.EMPTY_SUMMARY; + } + + if (swapEncodingVersion > 7) { + maxRecordId = in.readLong(); + } + } catch (final EOFException eof) { + logger.warn("Found premature End-of-File when reading Swap File {}. EOF occurred before any FlowFiles were encountered", swapLocation); return StandardSwapSummary.EMPTY_SUMMARY; } - Long maxRecordId = null; - if (swapEncodingVersion > 7) { - maxRecordId = in.readLong(); - } - - // Before swap encoding version 8, we did not write out the max record id, so we have to read all - // swap files to determine the max record id - final List resourceClaims = new ArrayList<>(numRecords); - final List records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager); - for (final FlowFileRecord record : records) { - if (maxRecordId == null || record.getId() > maxRecordId) { - maxRecordId = record.getId(); - } - - final ContentClaim contentClaim = record.getContentClaim(); - if (contentClaim != null) { - resourceClaims.add(contentClaim.getResourceClaim()); - } - } - - return new StandardSwapSummary(new QueueSize(numRecords, contentSize), maxRecordId, resourceClaims); + final QueueSize queueSize = new QueueSize(numRecords, contentSize); + final SwapContents swapContents = deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, true, claimManager, swapLocation); + return swapContents.getSummary(); } } @@ -385,7 +381,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - static List deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { + static SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " @@ -398,115 +394,146 @@ public class FileSystemSwapManager implements FlowFileSwapManager { " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier()); } - final int numRecords = in.readInt(); - in.readLong(); // Content Size - if (swapEncodingVersion > 7) { - in.readLong(); // Max Record ID + int numRecords = 0; + long contentSize = 0L; + Long maxRecordId = null; + try { + numRecords = in.readInt(); + contentSize = in.readLong(); // Content Size + if (swapEncodingVersion > 7) { + maxRecordId = in.readLong(); // Max Record ID + } + } catch (final EOFException eof) { + final QueueSize queueSize = new QueueSize(numRecords, contentSize); + final SwapSummary summary = new StandardSwapSummary(queueSize, maxRecordId, Collections. emptyList()); + final SwapContents partialContents = new StandardSwapContents(summary, Collections. emptyList()); + throw new IncompleteSwapFileException(swapLocation, partialContents); } - return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager); + final QueueSize queueSize = new QueueSize(numRecords, contentSize); + return deserializeFlowFiles(in, queueSize, maxRecordId, swapEncodingVersion, false, claimManager, swapLocation); } - private static List deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, - final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException { - final List flowFiles = new ArrayList<>(); - for (int i = 0; i < numFlowFiles; i++) { - // legacy encoding had an "action" because it used to be couple with FlowFile Repository code - if (serializationVersion < 3) { - final int action = in.read(); - if (action != 1) { - throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type"); + private static SwapContents deserializeFlowFiles(final DataInputStream in, final QueueSize queueSize, final Long maxRecordId, + final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager, final String location) throws IOException { + final List flowFiles = new ArrayList<>(queueSize.getObjectCount()); + final List resourceClaims = new ArrayList<>(queueSize.getObjectCount()); + Long maxId = maxRecordId; + + for (int i = 0; i < queueSize.getObjectCount(); i++) { + try { + // legacy encoding had an "action" because it used to be couple with FlowFile Repository code + if (serializationVersion < 3) { + final int action = in.read(); + if (action != 1) { + throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type"); + } } + + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final long recordId = in.readLong(); + if (maxId == null || recordId > maxId) { + maxId = recordId; + } + + ffBuilder.id(recordId); + ffBuilder.entryDate(in.readLong()); + + if (serializationVersion > 1) { + // Lineage information was added in version 2 + final int numLineageIdentifiers = in.readInt(); + final Set lineageIdentifiers = new HashSet<>(numLineageIdentifiers); + for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) { + lineageIdentifiers.add(in.readUTF()); + } + ffBuilder.lineageIdentifiers(lineageIdentifiers); + ffBuilder.lineageStartDate(in.readLong()); + + if (serializationVersion > 5) { + ffBuilder.lastQueueDate(in.readLong()); + } + } + + ffBuilder.size(in.readLong()); + + if (serializationVersion < 3) { + readString(in); // connection Id + } + + final boolean hasClaim = in.readBoolean(); + ResourceClaim resourceClaim = null; + if (hasClaim) { + final String claimId; + if (serializationVersion < 5) { + claimId = String.valueOf(in.readLong()); + } else { + claimId = in.readUTF(); + } + + final String container = in.readUTF(); + final String section = in.readUTF(); + + final long resourceOffset; + final long resourceLength; + if (serializationVersion < 6) { + resourceOffset = 0L; + resourceLength = -1L; + } else { + resourceOffset = in.readLong(); + resourceLength = in.readLong(); + } + + final long claimOffset = in.readLong(); + + final boolean lossTolerant; + if (serializationVersion >= 4) { + lossTolerant = in.readBoolean(); + } else { + lossTolerant = false; + } + + resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset); + claim.setLength(resourceLength); + + if (incrementContentClaims) { + claimManager.incrementClaimantCount(resourceClaim); + } + + ffBuilder.contentClaim(claim); + ffBuilder.contentClaimOffset(claimOffset); + } + + boolean attributesChanged = true; + if (serializationVersion < 3) { + attributesChanged = in.readBoolean(); + } + + if (attributesChanged) { + final int numAttributes = in.readInt(); + for (int j = 0; j < numAttributes; j++) { + final String key = readString(in); + final String value = readString(in); + + ffBuilder.addAttribute(key, value); + } + } + + final FlowFileRecord record = ffBuilder.build(); + if (resourceClaim != null) { + resourceClaims.add(resourceClaim); + } + + flowFiles.add(record); + } catch (final EOFException eof) { + final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims); + final SwapContents partialContents = new StandardSwapContents(swapSummary, flowFiles); + throw new IncompleteSwapFileException(location, partialContents); } - - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - ffBuilder.id(in.readLong()); - ffBuilder.entryDate(in.readLong()); - - if (serializationVersion > 1) { - // Lineage information was added in version 2 - final int numLineageIdentifiers = in.readInt(); - final Set lineageIdentifiers = new HashSet<>(numLineageIdentifiers); - for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; lineageIdIdx++) { - lineageIdentifiers.add(in.readUTF()); - } - ffBuilder.lineageIdentifiers(lineageIdentifiers); - ffBuilder.lineageStartDate(in.readLong()); - - if (serializationVersion > 5) { - ffBuilder.lastQueueDate(in.readLong()); - } - } - - ffBuilder.size(in.readLong()); - - if (serializationVersion < 3) { - readString(in); // connection Id - } - - final boolean hasClaim = in.readBoolean(); - if (hasClaim) { - final String claimId; - if (serializationVersion < 5) { - claimId = String.valueOf(in.readLong()); - } else { - claimId = in.readUTF(); - } - - final String container = in.readUTF(); - final String section = in.readUTF(); - - final long resourceOffset; - final long resourceLength; - if (serializationVersion < 6) { - resourceOffset = 0L; - resourceLength = -1L; - } else { - resourceOffset = in.readLong(); - resourceLength = in.readLong(); - } - - final long claimOffset = in.readLong(); - - final boolean lossTolerant; - if (serializationVersion >= 4) { - lossTolerant = in.readBoolean(); - } else { - lossTolerant = false; - } - - final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); - final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset); - claim.setLength(resourceLength); - - if (incrementContentClaims) { - claimManager.incrementClaimantCount(resourceClaim); - } - - ffBuilder.contentClaim(claim); - ffBuilder.contentClaimOffset(claimOffset); - } - - boolean attributesChanged = true; - if (serializationVersion < 3) { - attributesChanged = in.readBoolean(); - } - - if (attributesChanged) { - final int numAttributes = in.readInt(); - for (int j = 0; j < numAttributes; j++) { - final String key = readString(in); - final String value = readString(in); - - ffBuilder.addAttribute(key, value); - } - } - - final FlowFileRecord record = ffBuilder.build(); - flowFiles.add(record); } - return flowFiles; + final SwapSummary swapSummary = new StandardSwapSummary(queueSize, maxId, resourceClaims); + return new StandardSwapContents(swapSummary, flowFiles); } private static String readString(final InputStream in) throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 22aacdcfc7..0f3ffe0b8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -47,8 +47,10 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.IncompleteSwapFileException; import org.apache.nifi.controller.repository.RepositoryRecord; import org.apache.nifi.controller.repository.RepositoryRecordType; +import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -456,16 +458,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // first. if (!swapLocations.isEmpty()) { final String swapLocation = swapLocations.remove(0); + boolean partialContents = false; + SwapContents swapContents = null; try { - final List swappedIn = swapManager.swapIn(swapLocation, this); - long swapSize = 0L; - for (final FlowFileRecord flowFile : swappedIn) { - swapSize += flowFile.getSize(); - } - incrementSwapQueueSize(-swappedIn.size(), -swapSize, -1); - incrementActiveQueueSize(swappedIn.size(), swapSize); - activeQueue.addAll(swappedIn); - return; + swapContents = swapManager.swapIn(swapLocation, this); + } catch (final IncompleteSwapFileException isfe) { + logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation); + logger.error("", isfe); + swapContents = isfe.getPartialContents(); + partialContents = true; } catch (final FileNotFoundException fnfe) { logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); if (eventReporter != null) { @@ -481,6 +482,28 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } return; } + + final QueueSize swapSize = swapContents.getSummary().getQueueSize(); + final long contentSize = swapSize.getByteCount(); + final int flowFileCount = swapSize.getObjectCount(); + incrementSwapQueueSize(-flowFileCount, -contentSize, -1); + + if (partialContents) { + // if we have partial results, we need to calculate the content size of the flowfiles + // actually swapped back in. + long contentSizeSwappedIn = 0L; + for (final FlowFileRecord swappedIn : swapContents.getFlowFiles()) { + contentSizeSwappedIn += swappedIn.getSize(); + } + + incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn); + } else { + // we swapped in the whole swap file. We can just use the info that we got from the summary. + incrementActiveQueueSize(flowFileCount, contentSize); + } + + activeQueue.addAll(swapContents.getFlowFiles()); + return; } // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense @@ -710,6 +733,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override + @SuppressWarnings("deprecation") public int compare(final FlowFileRecord f1, final FlowFileRecord f2) { int returnVal = 0; final boolean f1Penalized = f1.isPenalized(); @@ -1111,23 +1135,36 @@ public final class StandardFlowFileQueue implements FlowFileQueue { while (swapLocationItr.hasNext()) { final String swapLocation = swapLocationItr.next(); - List swappedIn = null; + SwapContents swapContents = null; try { if (dropRequest.getState() == DropFlowFileState.CANCELED) { logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); return; } - swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); - droppedSize = drop(swappedIn, requestor); + swapContents = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); + droppedSize = drop(swapContents.getFlowFiles(), requestor); + } catch (final IncompleteSwapFileException isfe) { + swapContents = isfe.getPartialContents(); + final String warnMsg = "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the file was corrupt. " + + "Some FlowFiles may not be dropped from the queue until NiFi is restarted."; + + logger.warn(warnMsg); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg); + } } catch (final IOException ioe) { logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); logger.error("", ioe); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation + + ". The FlowFiles contained in this Swap File will not be dropped from the queue"); + } dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString()); - if (swappedIn != null) { - activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. + if (swapContents != null) { + activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose the FlowFiles from our queue. } return; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java new file mode 100644 index 0000000000..7e64a2a9cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/StandardSwapContents.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.swap; + +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.SwapContents; +import org.apache.nifi.controller.repository.SwapSummary; + +public class StandardSwapContents implements SwapContents { + public static final SwapContents EMPTY_SWAP_CONTENTS = new StandardSwapContents(StandardSwapSummary.EMPTY_SUMMARY, Collections. emptyList()); + + private final SwapSummary summary; + private final List flowFiles; + + public StandardSwapContents(final SwapSummary summary, final List flowFiles) { + this.summary = summary; + this.flowFiles = flowFiles; + } + + @Override + public SwapSummary getSummary() { + return summary; + } + + @Override + public List getFlowFiles() { + return flowFiles; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index f7191c5378..fcfd524ef4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; @@ -57,7 +58,8 @@ public class TestFileSystemSwapManager { final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); - final List records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager()); + final SwapContents swapContents = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager()); + final List records = swapContents.getFlowFiles(); assertEquals(10000, records.size()); for (final FlowFileRecord record : records) { @@ -68,6 +70,7 @@ public class TestFileSystemSwapManager { } @Test + @SuppressWarnings("deprecation") public void testRoundTripSerializeDeserialize() throws IOException { final List toSwap = new ArrayList<>(10000); final Map attrs = new HashMap<>(); @@ -88,16 +91,16 @@ public class TestFileSystemSwapManager { FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); } - final List swappedIn; + final SwapContents swappedIn; try (final FileInputStream fis = new FileInputStream(swapFile); final DataInputStream dis = new DataInputStream(fis)) { swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class)); } - assertEquals(toSwap.size(), swappedIn.size()); + assertEquals(toSwap.size(), swappedIn.getFlowFiles().size()); for (int i = 0; i < toSwap.size(); i++) { final FlowFileRecord pre = toSwap.get(i); - final FlowFileRecord post = swappedIn.get(i); + final FlowFileRecord post = swappedIn.getFlowFiles().get(i); assertEquals(pre.getSize(), post.getSize()); assertEquals(pre.getAttributes(), post.getAttributes()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 412e3769b1..fc33d99c0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -45,11 +45,14 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.repository.IncompleteSwapFileException; +import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.SwapManagerInitializationContext; import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; @@ -378,6 +381,68 @@ public class TestStandardFlowFileQueue { queue.poll(exp); } + @Test + public void testQueueCountsUpdatedWhenIncompleteSwapFile() { + for (int i = 1; i <= 20000; i++) { + queue.put(new TestFlowFile()); + } + + assertEquals(20000, queue.size().getObjectCount()); + assertEquals(20000, queue.size().getByteCount()); + + assertEquals(1, swapManager.swappedOut.size()); + + // when we swap in, cause an IncompleteSwapFileException to be + // thrown and contain only 9,999 of the 10,000 FlowFiles + swapManager.enableIncompleteSwapFileException(9999); + final Set expired = Collections.emptySet(); + FlowFileRecord flowFile; + + for (int i = 0; i < 10000; i++) { + flowFile = queue.poll(expired); + assertNotNull(flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + } + + // 10,000 FlowFiles on queue - all swapped out + assertEquals(10000, queue.size().getObjectCount()); + assertEquals(10000, queue.size().getByteCount()); + assertEquals(1, swapManager.swappedOut.size()); + assertEquals(0, swapManager.swapInCalledCount); + + // Trigger swap in. This will remove 1 FlowFile from queue, leaving 9,999 but + // on swap in, we will get only 9,999 FlowFiles put onto the queue, and the queue size will + // be decremented by 10,000 (because the Swap File's header tells us that there are 10K + // FlowFiles, even though only 9999 are in the swap file) + flowFile = queue.poll(expired); + assertNotNull(flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + + // size should be 9,998 because we lost 1 on Swap In, and then we pulled one above. + assertEquals(9998, queue.size().getObjectCount()); + assertEquals(9998, queue.size().getByteCount()); + assertEquals(0, swapManager.swappedOut.size()); + assertEquals(1, swapManager.swapInCalledCount); + + for (int i = 0; i < 9998; i++) { + flowFile = queue.poll(expired); + assertNotNull("Null FlowFile when i = " + i, flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + + final QueueSize queueSize = queue.size(); + assertEquals(9998 - i - 1, queueSize.getObjectCount()); + assertEquals(9998 - i - 1, queueSize.getByteCount()); + } + + final QueueSize queueSize = queue.size(); + assertEquals(0, queueSize.getObjectCount()); + assertEquals(0L, queueSize.getByteCount()); + + flowFile = queue.poll(expired); + assertNull(flowFile); + } + + @Test(timeout = 120000) public void testDropSwappedFlowFiles() { for (int i = 1; i <= 210000; i++) { @@ -445,11 +510,21 @@ public class TestStandardFlowFileQueue { int swapOutCalledCount = 0; int swapInCalledCount = 0; + private int incompleteSwapFileRecordsToInclude = -1; + @Override public void initialize(final SwapManagerInitializationContext initializationContext) { } + public void enableIncompleteSwapFileException(final int flowFilesToInclude) { + incompleteSwapFileRecordsToInclude = flowFilesToInclude; + } + + public void disableIncompleteSwapFileException() { + this.incompleteSwapFileRecordsToInclude = -1; + } + @Override public String swapOut(List flowFiles, FlowFileQueue flowFileQueue) throws IOException { swapOutCalledCount++; @@ -458,15 +533,34 @@ public class TestStandardFlowFileQueue { return location; } - @Override - public List peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { - return new ArrayList(swappedOut.get(swapLocation)); + private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException { + if (incompleteSwapFileRecordsToInclude > -1) { + final SwapSummary summary = getSwapSummary(swapLocation); + + final List records; + if (remove) { + records = swappedOut.remove(swapLocation); + } else { + records = swappedOut.get(swapLocation); + } + + final List partial = records.subList(0, incompleteSwapFileRecordsToInclude); + final SwapContents partialContents = new StandardSwapContents(summary, partial); + throw new IncompleteSwapFileException(swapLocation, partialContents); + } } @Override - public List swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { + public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException { + throwIncompleteIfNecessary(swapLocation, false); + return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation)); + } + + @Override + public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { swapInCalledCount++; - return swappedOut.remove(swapLocation); + throwIncompleteIfNecessary(swapLocation, true); + return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation)); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index c1c7b4541d..cd4aa2799c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -46,6 +46,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.util.file.FileUtils; import org.junit.Before; @@ -156,6 +157,7 @@ public class TestWriteAheadFlowFileRepository { } @Test + @SuppressWarnings("deprecation") public void testRestartWithOneRecord() throws IOException { final Path path = Paths.get("target/test-repo"); if (Files.exists(path)) { @@ -269,23 +271,27 @@ public class TestWriteAheadFlowFileRepository { } @Override - public List peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { + public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { Map> swapMap = swappedRecords.get(flowFileQueue); if (swapMap == null) { return null; } - return Collections.unmodifiableList(swapMap.get(swapLocation)); + final List flowFiles = swapMap.get(swapLocation); + final SwapSummary summary = getSwapSummary(swapLocation); + return new StandardSwapContents(summary, flowFiles); } @Override - public List swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { + public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException { Map> swapMap = swappedRecords.get(flowFileQueue); if (swapMap == null) { return null; } - return swapMap.remove(swapLocation); + final List flowFiles = swapMap.remove(swapLocation); + final SwapSummary summary = getSwapSummary(swapLocation); + return new StandardSwapContents(summary, flowFiles); } @Override