NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread

This commit is contained in:
Mark Payne 2015-10-12 13:27:07 -04:00
parent b8c51dc35d
commit 49a781df2d
22 changed files with 1158 additions and 628 deletions

View File

@ -25,7 +25,8 @@ public enum DropFlowFileState {
WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"), WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"),
DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"), DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
COMPLETE("Completed Successfully"), COMPLETE("Completed Successfully"),
FAILURE("Failed"); FAILURE("Failed"),
CANCELED("Cancelled by User");
private final String description; private final String description;

View File

@ -42,6 +42,12 @@ public interface DropFlowFileStatus {
*/ */
long getRequestSubmissionTime(); long getRequestSubmissionTime();
/**
* @return the date/time (in milliseconds since epoch) at which the status of the
* request was last updated
*/
long getLastUpdated();
/** /**
* @return the size of the queue when the drop request was issued or <code>null</code> if * @return the size of the queue when the drop request was issued or <code>null</code> if
* it is not yet known, which can happen if the {@link DropFlowFileState} is * it is not yet known, which can happen if the {@link DropFlowFileState} is
@ -58,5 +64,4 @@ public interface DropFlowFileStatus {
* @return the current state of the operation * @return the current state of the operation
*/ */
DropFlowFileState getState(); DropFlowFileState getState();
} }

View File

@ -59,13 +59,6 @@ public interface FlowFileQueue {
*/ */
void purgeSwapFiles(); void purgeSwapFiles();
/**
* @return the minimum number of FlowFiles that must be present in order for
* FlowFiles to begin being swapped out of the queue
*/
// TODO: REMOVE THIS.
int getSwapThreshold();
/** /**
* Resets the comparator used by this queue to maintain order. * Resets the comparator used by this queue to maintain order.
* *
@ -112,12 +105,8 @@ public interface FlowFileQueue {
* not include those FlowFiles that have been swapped out or are currently * not include those FlowFiles that have been swapped out or are currently
* being processed * being processed
*/ */
// TODO: REMOVE?
boolean isActiveQueueEmpty(); boolean isActiveQueueEmpty();
// TODO: REMOVE?
QueueSize getActiveQueueSize();
/** /**
* Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
* is considered to be unacknowledged if it has been pulled from the queue by some component * is considered to be unacknowledged if it has been pulled from the queue by some component
@ -151,45 +140,6 @@ public interface FlowFileQueue {
*/ */
void putAll(Collection<FlowFileRecord> files); void putAll(Collection<FlowFileRecord> files);
/**
* Removes all records from the internal swap queue and returns them.
*
* @return all removed records from internal swap queue
*/
// TODO: REMOVE THIS?
List<FlowFileRecord> pollSwappableRecords();
/**
* Restores the records from swap space into this queue, adding the records
* that have expired to the given set instead of enqueuing them.
*
* @param records that were swapped in
*/
// TODO: REMOVE THIS?
void putSwappedRecords(Collection<FlowFileRecord> records);
/**
* Updates the internal counters of how much data is queued, based on
* swapped data that is being restored.
*
* @param numRecords count of records swapped in
* @param contentSize total size of records being swapped in
*/
// TODO: REMOVE THIS?
void incrementSwapCount(int numRecords, long contentSize);
/**
* @return the number of FlowFiles that are enqueued and not swapped
*/
// TODO: REMOVE THIS?
int unswappedSize();
// TODO: REMOVE THIS?
int getSwapRecordCount();
// TODO: REMOVE THIS?
int getSwapQueueSize();
/** /**
* @param expiredRecords expired records * @param expiredRecords expired records
* @return the next flow file on the queue; null if empty * @return the next flow file on the queue; null if empty

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.controller.queue; package org.apache.nifi.controller.queue;
import java.text.NumberFormat;
/** /**
* *
*/ */
@ -45,4 +47,9 @@ public class QueueSize {
public long getByteCount() { public long getByteCount() {
return totalSizeBytes; return totalSizeBytes;
} }
@Override
public String toString() {
return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
}
} }

View File

@ -26,6 +26,9 @@ import org.apache.nifi.controller.queue.QueueSize;
* Defines a mechanism by which FlowFiles can be move into external storage or * Defines a mechanism by which FlowFiles can be move into external storage or
* memory so that they can be removed from the Java heap and vice-versa * memory so that they can be removed from the Java heap and vice-versa
*/ */
// TODO: This needs to be refactored into two different mechanisms, one that is responsible for doing
// framework-y types of things, such as updating the repositories, and another that is responsible
// for serializing and deserializing FlowFiles to external storage.
public interface FlowFileSwapManager { public interface FlowFileSwapManager {
/** /**
@ -37,6 +40,16 @@ public interface FlowFileSwapManager {
*/ */
void initialize(SwapManagerInitializationContext initializationContext); void initialize(SwapManagerInitializationContext initializationContext);
/**
* Drops all FlowFiles that are swapped out at the given location. This will update the Provenance
* Repository as well as the FlowFile Repository and
*
* @param swapLocation the location of the swap file to drop
* @param flowFileQueue the queue to which the FlowFiles belong
* @param user the user that initiated the request
*/
void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException;
/** /**
* Swaps out the given FlowFiles that belong to the queue with the given identifier. * Swaps out the given FlowFiles that belong to the queue with the given identifier.
* *
@ -53,13 +66,13 @@ public interface FlowFileSwapManager {
* provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
* at the given location remains in that location and the FlowFile Repository is not updated. * at the given location remains in that location and the FlowFile Repository is not updated.
* *
* @param swapLocation the location of hte swap file * @param swapLocation the location of the swap file
* @param flowFileQueue the queue that the FlowFiles belong to * @param flowFileQueue the queue that the FlowFiles belong to
* @return the FlowFiles that live at the given swap location * @return the FlowFiles that live at the given swap location
* *
* @throws IOException if unable to recover the FlowFiles from the given location * @throws IOException if unable to recover the FlowFiles from the given location
*/ */
List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException; List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException;
/** /**
* Recovers the FlowFiles from the swap file that lives at the given location and belongs * Recovers the FlowFiles from the swap file that lives at the given location and belongs

View File

@ -27,7 +27,6 @@ public interface SwapManagerInitializationContext {
*/ */
FlowFileRepository getFlowFileRepository(); FlowFileRepository getFlowFileRepository();
/** /**
* @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when
* performing swapping actions * performing swapping actions

View File

@ -33,12 +33,13 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.junit.Assert; import org.junit.Assert;
public class MockFlowFile implements FlowFile { public class MockFlowFile implements FlowFileRecord {
private final Map<String, String> attributes = new HashMap<>(); private final Map<String, String> attributes = new HashMap<>();
@ -170,7 +171,7 @@ public class MockFlowFile implements FlowFile {
public void assertAttributeNotExists(final String attributeName) { public void assertAttributeNotExists(final String attributeName) {
Assert.assertFalse("Attribute " + attributeName + " not exists with value " + attributes.get(attributeName), Assert.assertFalse("Attribute " + attributeName + " not exists with value " + attributes.get(attributeName),
attributes.containsKey(attributeName)); attributes.containsKey(attributeName));
} }
public void assertAttributeEquals(final String attributeName, final String expectedValue) { public void assertAttributeEquals(final String attributeName, final String expectedValue) {
@ -250,7 +251,7 @@ public class MockFlowFile implements FlowFile {
if ((fromStream & 0xFF) != (data[i] & 0xFF)) { if ((fromStream & 0xFF) != (data[i] & 0xFF)) {
Assert.fail("FlowFile content differs from input at byte " + bytesRead + " with input having value " Assert.fail("FlowFile content differs from input at byte " + bytesRead + " with input having value "
+ (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF)); + (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF));
} }
bytesRead++; bytesRead++;
@ -274,4 +275,19 @@ public class MockFlowFile implements FlowFile {
public Long getLastQueueDate() { public Long getLastQueueDate() {
return entryDate; return entryDate;
} }
@Override
public long getPenaltyExpirationMillis() {
return -1;
}
@Override
public ContentClaim getContentClaim() {
return null;
}
@Override
public long getContentClaimOffset() {
return 0;
}
} }

View File

@ -23,7 +23,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.processor.QueueSize; import org.apache.nifi.controller.queue.QueueSize;
public class MockFlowFileQueue { public class MockFlowFileQueue {

View File

@ -40,12 +40,12 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.processor.exception.FlowFileHandlingException;
@ -691,7 +691,7 @@ public class MockProcessSession implements ProcessSession {
/** /**
* @param relationship to get flowfiles for * @param relationship to get flowfiles for
* @return a List of FlowFiles in the order in which they were transferred * @return a List of FlowFiles in the order in which they were transferred
* to the given relationship * to the given relationship
*/ */
public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
final Relationship procRel = new Relationship.Builder().name(relationship).build(); final Relationship procRel = new Relationship.Builder().name(relationship).build();
@ -778,7 +778,7 @@ public class MockProcessSession implements ProcessSession {
*/ */
private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) { private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) {
if (source == null || destination == null || source == destination) { if (source == null || destination == null || source == destination) {
return destination; //don't need to inherit from ourselves return destination; // don't need to inherit from ourselves
} }
final FlowFile updated = putAllAttributes(destination, source.getAttributes()); final FlowFile updated = putAllAttributes(destination, source.getAttributes());
getProvenanceReporter().fork(source, Collections.singletonList(updated)); getProvenanceReporter().fork(source, Collections.singletonList(updated));
@ -801,7 +801,7 @@ public class MockProcessSession implements ProcessSession {
int uuidsCaptured = 0; int uuidsCaptured = 0;
for (final FlowFile source : sources) { for (final FlowFile source : sources) {
if (source == destination) { if (source == destination) {
continue; //don't want to capture parent uuid of this. Something can't be a child of itself continue; // don't want to capture parent uuid of this. Something can't be a child of itself
} }
final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key()); final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key());
if (sourceUuid != null && !sourceUuid.trim().isEmpty()) { if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
@ -832,7 +832,7 @@ public class MockProcessSession implements ProcessSession {
*/ */
private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
final Map<String, String> result = new HashMap<>(); final Map<String, String> result = new HashMap<>();
//trivial cases // trivial cases
if (flowFileList == null || flowFileList.isEmpty()) { if (flowFileList == null || flowFileList.isEmpty()) {
return result; return result;
} else if (flowFileList.size() == 1) { } else if (flowFileList.size() == 1) {
@ -845,8 +845,7 @@ public class MockProcessSession implements ProcessSession {
*/ */
final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
outer: outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey(); final String key = mapEntry.getKey();
final String value = mapEntry.getValue(); final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) { for (final FlowFile flowFile : flowFileList) {
@ -900,7 +899,7 @@ public class MockProcessSession implements ProcessSession {
public void assertTransferCount(final Relationship relationship, final int count) { public void assertTransferCount(final Relationship relationship, final int count) {
final int transferCount = getFlowFilesForRelationship(relationship).size(); final int transferCount = getFlowFilesForRelationship(relationship).size();
Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to " Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to "
+ relationship + " but actual transfer count was " + transferCount, count, transferCount); + relationship + " but actual transfer count was " + transferCount, count, transferCount);
} }
/** /**

View File

@ -58,12 +58,12 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceReporter;
@ -222,7 +222,7 @@ public class StandardProcessorTestRunner implements TestRunner {
boolean unscheduledRun = false; boolean unscheduledRun = false;
for (final Future<Throwable> future : futures) { for (final Future<Throwable> future : futures) {
try { try {
final Throwable thrown = future.get(); // wait for the result final Throwable thrown = future.get(); // wait for the result
if (thrown != null) { if (thrown != null) {
throw new AssertionError(thrown); throw new AssertionError(thrown);
} }
@ -551,11 +551,11 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override @Override
public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException { public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
// hold off on failing due to deprecated annotation for now... will introduce later. // hold off on failing due to deprecated annotation for now... will introduce later.
// for ( final Method method : service.getClass().getMethods() ) { // for ( final Method method : service.getClass().getMethods() ) {
// if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) { // if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) {
// Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method); // Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method);
// } // }
// } // }
final ComponentLog logger = new MockProcessorLog(identifier, service); final ComponentLog logger = new MockProcessorLog(identifier, service);
final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger); final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger);
@ -716,11 +716,11 @@ public class StandardProcessorTestRunner implements TestRunner {
final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
if (descriptor == null) { if (descriptor == null) {
return new ValidationResult.Builder() return new ValidationResult.Builder()
.input(propertyName) .input(propertyName)
.explanation(propertyName + " is not a known Property for Controller Service " + service) .explanation(propertyName + " is not a known Property for Controller Service " + service)
.subject("Invalid property") .subject("Invalid property")
.valid(false) .valid(false)
.build(); .build();
} }
return setProperty(service, descriptor, value); return setProperty(service, descriptor, value);
} }

View File

@ -26,11 +26,11 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.ProvenanceReporter;
@ -40,22 +40,22 @@ public interface TestRunner {
/** /**
* @return the {@link Processor} for which this <code>TestRunner</code> is * @return the {@link Processor} for which this <code>TestRunner</code> is
* configured * configured
*/ */
Processor getProcessor(); Processor getProcessor();
/** /**
* @return the {@link ProcessSessionFactory} that this * @return the {@link ProcessSessionFactory} that this
* <code>TestRunner</code> will use to invoke the * <code>TestRunner</code> will use to invoke the
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method
*/ */
ProcessSessionFactory getProcessSessionFactory(); ProcessSessionFactory getProcessSessionFactory();
/** /**
* @return the {@Link ProcessContext} that this <code>TestRunner</code> will * @return the {@Link ProcessContext} that this <code>TestRunner</code> will
* use to invoke the * use to invoke the
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}
* method * method
*/ */
ProcessContext getProcessContext(); ProcessContext getProcessContext();
@ -120,7 +120,7 @@ public interface TestRunner {
* *
* @param iterations number of iterations * @param iterations number of iterations
* @param stopOnFinish whether or not to run the Processor methods that are * @param stopOnFinish whether or not to run the Processor methods that are
* annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
* @param initialize true if must initialize * @param initialize true if must initialize
*/ */
void run(int iterations, boolean stopOnFinish, final boolean initialize); void run(int iterations, boolean stopOnFinish, final boolean initialize);
@ -163,10 +163,10 @@ public interface TestRunner {
* *
* @param iterations number of iterations * @param iterations number of iterations
* @param stopOnFinish whether or not to run the Processor methods that are * @param stopOnFinish whether or not to run the Processor methods that are
* annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped} * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
* @param initialize true if must initialize * @param initialize true if must initialize
* @param runWait indicates the amount of time in milliseconds that the framework should wait for * @param runWait indicates the amount of time in milliseconds that the framework should wait for
* processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation * processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
*/ */
void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait);
@ -187,8 +187,8 @@ public interface TestRunner {
/** /**
* @return the currently configured number of threads that will be used to * @return the currently configured number of threads that will be used to
* runt he Processor when calling the {@link #run()} or {@link #run(int)} * runt he Processor when calling the {@link #run()} or {@link #run(int)}
* methods * methods
*/ */
int getThreadCount(); int getThreadCount();
@ -296,7 +296,7 @@ public interface TestRunner {
/** /**
* @return <code>true</code> if the Input Queue to the Processor is empty, * @return <code>true</code> if the Input Queue to the Processor is empty,
* <code>false</code> otherwise * <code>false</code> otherwise
*/ */
boolean isQueueEmpty(); boolean isQueueEmpty();
@ -421,7 +421,7 @@ public interface TestRunner {
/** /**
* @return the {@link ProvenanceReporter} that will be used by the * @return the {@link ProvenanceReporter} that will be used by the
* configured {@link Processor} for reporting Provenance Events * configured {@link Processor} for reporting Provenance Events
*/ */
ProvenanceReporter getProvenanceReporter(); ProvenanceReporter getProvenanceReporter();
@ -433,7 +433,7 @@ public interface TestRunner {
/** /**
* @param name of counter * @param name of counter
* @return the current value of the counter with the specified name, or null * @return the current value of the counter with the specified name, or null
* if no counter exists with the specified name * if no counter exists with the specified name
*/ */
Long getCounterValue(String name); Long getCounterValue(String name);
@ -599,14 +599,14 @@ public interface TestRunner {
/** /**
* @param service the service * @param service the service
* @return {@code true} if the given Controller Service is enabled, * @return {@code true} if the given Controller Service is enabled,
* {@code false} if it is disabled * {@code false} if it is disabled
* *
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
*/ */
boolean isControllerServiceEnabled(ControllerService service); boolean isControllerServiceEnabled(ControllerService service);
@ -622,11 +622,11 @@ public interface TestRunner {
* *
* @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalStateException if the ControllerService is not disabled
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
* *
*/ */
void removeControllerService(ControllerService service); void removeControllerService(ControllerService service);
@ -641,11 +641,11 @@ public interface TestRunner {
* *
* @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalStateException if the ControllerService is not disabled
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
* *
*/ */
ValidationResult setProperty(ControllerService service, PropertyDescriptor property, String value); ValidationResult setProperty(ControllerService service, PropertyDescriptor property, String value);
@ -660,11 +660,11 @@ public interface TestRunner {
* *
* @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalStateException if the ControllerService is not disabled
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
* *
*/ */
ValidationResult setProperty(ControllerService service, PropertyDescriptor property, AllowableValue value); ValidationResult setProperty(ControllerService service, PropertyDescriptor property, AllowableValue value);
@ -679,11 +679,11 @@ public interface TestRunner {
* *
* @throws IllegalStateException if the ControllerService is not disabled * @throws IllegalStateException if the ControllerService is not disabled
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
* *
*/ */
ValidationResult setProperty(ControllerService service, String propertyName, String value); ValidationResult setProperty(ControllerService service, String propertyName, String value);
@ -698,19 +698,19 @@ public interface TestRunner {
* @throws IllegalStateException if the Controller Service is not disabled * @throws IllegalStateException if the Controller Service is not disabled
* *
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
*/ */
void setAnnotationData(ControllerService service, String annotationData); void setAnnotationData(ControllerService service, String annotationData);
/** /**
* @param identifier of controller service * @param identifier of controller service
* @return the {@link ControllerService} that is registered with the given * @return the {@link ControllerService} that is registered with the given
* identifier, or <code>null</code> if no Controller Service exists with the * identifier, or <code>null</code> if no Controller Service exists with the
* given identifier * given identifier
*/ */
ControllerService getControllerService(String identifier); ControllerService getControllerService(String identifier);
@ -720,11 +720,11 @@ public interface TestRunner {
* *
* @param service the service to validate * @param service the service to validate
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
*/ */
void assertValid(ControllerService service); void assertValid(ControllerService service);
@ -734,11 +734,11 @@ public interface TestRunner {
* *
* @param service the service to validate * @param service the service to validate
* @throws IllegalArgumentException if the given ControllerService is not * @throws IllegalArgumentException if the given ControllerService is not
* known by this TestRunner (i.e., it has not been added via the * known by this TestRunner (i.e., it has not been added via the
* {@link #addControllerService(String, ControllerService)} or * {@link #addControllerService(String, ControllerService)} or
* {@link #addControllerService(String, ControllerService, Map)} method or * {@link #addControllerService(String, ControllerService, Map)} method or
* if the Controller Service has been removed via the * if the Controller Service has been removed via the
* {@link #removeControllerService(ControllerService)} method. * {@link #removeControllerService(ControllerService)} method.
* *
*/ */
void assertNotValid(ControllerService service); void assertNotValid(ControllerService service);
@ -748,12 +748,12 @@ public interface TestRunner {
* @param identifier identifier of service * @param identifier identifier of service
* @param serviceType type of service * @param serviceType type of service
* @return the {@link ControllerService} that is registered with the given * @return the {@link ControllerService} that is registered with the given
* identifier, cast as the provided service type, or <code>null</code> if no * identifier, cast as the provided service type, or <code>null</code> if no
* Controller Service exists with the given identifier * Controller Service exists with the given identifier
* *
* @throws ClassCastException if the identifier given is registered for a * @throws ClassCastException if the identifier given is registered for a
* Controller Service but that Controller Service is not of the type * Controller Service but that Controller Service is not of the type
* specified * specified
*/ */
<T extends ControllerService> T getControllerService(String identifier, Class<T> serviceType); <T extends ControllerService> T getControllerService(String identifier, Class<T> serviceType);

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
public class DropFlowFileRequest implements DropFlowFileStatus {
private final String identifier;
private final long submissionTime = System.currentTimeMillis();
private volatile QueueSize originalSize;
private volatile QueueSize currentSize;
private volatile long lastUpdated = System.currentTimeMillis();
private volatile Thread executionThread;
private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
public DropFlowFileRequest(final String identifier) {
this.identifier = identifier;
}
@Override
public String getRequestIdentifier() {
return identifier;
}
@Override
public long getRequestSubmissionTime() {
return submissionTime;
}
@Override
public QueueSize getOriginalSize() {
return originalSize;
}
void setOriginalSize(final QueueSize originalSize) {
this.originalSize = originalSize;
}
@Override
public QueueSize getCurrentSize() {
return currentSize;
}
void setCurrentSize(final QueueSize queueSize) {
this.currentSize = currentSize;
}
@Override
public DropFlowFileState getState() {
return state;
}
@Override
public long getLastUpdated() {
return lastUpdated;
}
synchronized void setState(final DropFlowFileState state) {
this.state = state;
this.lastUpdated = System.currentTimeMillis();
}
void setExecutionThread(final Thread thread) {
this.executionThread = thread;
}
synchronized boolean cancel() {
if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
return false;
}
this.state = DropFlowFileState.CANCELED;
if (executionThread != null) {
executionThread.interrupt();
}
return true;
}
}

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@ -24,9 +25,13 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -35,25 +40,32 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.concurrency.TimedLock; import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,15 +79,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
public static final int SWAP_RECORD_POLL_SIZE = 10000; public static final int SWAP_RECORD_POLL_SIZE = 10000;
// When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this,
// we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that
// we can then poll many times without having to obtain the lock.
// If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch.
public static final int PREFETCH_POLL_THRESHOLD = 1000;
public static final int PRIORITIZED_PREFETCH_SIZE = 10;
public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000;
private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch?
private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
private PriorityQueue<FlowFileRecord> activeQueue = null; private PriorityQueue<FlowFileRecord> activeQueue = null;
@ -97,9 +100,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final List<FlowFilePrioritizer> priorities; private final List<FlowFilePrioritizer> priorities;
private final int swapThreshold; private final int swapThreshold;
private final FlowFileSwapManager swapManager; private final FlowFileSwapManager swapManager;
private final List<String> swapLocations = new ArrayList<>();
private final TimedLock readLock; private final TimedLock readLock;
private final TimedLock writeLock; private final TimedLock writeLock;
private final String identifier; private final String identifier;
private final FlowFileRepository flowFileRepository;
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
private final AtomicBoolean queueFullRef = new AtomicBoolean(false); private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
@ -108,8 +115,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler; private final ProcessScheduler scheduler;
public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
final int swapThreshold) { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>(); priorities = new ArrayList<>();
maximumQueueObjectCount = 0L; maximumQueueObjectCount = 0L;
@ -120,6 +127,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swapQueue = new ArrayList<>(); swapQueue = new ArrayList<>();
this.eventReporter = eventReporter; this.eventReporter = eventReporter;
this.swapManager = swapManager; this.swapManager = swapManager;
this.flowFileRepository = flowFileRepo;
this.provRepository = provRepo;
this.resourceClaimManager = resourceClaimManager;
this.identifier = identifier; this.identifier = identifier;
this.swapThreshold = swapThreshold; this.swapThreshold = swapThreshold;
@ -140,11 +150,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return Collections.unmodifiableList(priorities); return Collections.unmodifiableList(priorities);
} }
@Override
public int getSwapThreshold() {
return swapThreshold;
}
@Override @Override
public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
writeLock.lock(); writeLock.lock();
@ -154,12 +159,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
activeQueue = newQueue; activeQueue = newQueue;
priorities.clear(); priorities.clear();
priorities.addAll(newPriorities); priorities.addAll(newPriorities);
if (newPriorities.isEmpty()) {
prefetchSize = UNPRIORITIZED_PREFETCH_SIZE;
} else {
prefetchSize = PRIORITIZED_PREFETCH_SIZE;
}
} finally { } finally {
writeLock.unlock("setPriorities"); writeLock.unlock("setPriorities");
} }
@ -225,33 +224,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
*/ */
private QueueSize getQueueSize() { private QueueSize getQueueSize() {
final QueueSize unacknowledged = unacknowledgedSizeRef.get(); final QueueSize unacknowledged = unacknowledgedSizeRef.get();
final PreFetch preFetch = preFetchRef.get();
final int preFetchCount; return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(),
final long preFetchSize; activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount());
if (preFetch == null) {
preFetchCount = 0;
preFetchSize = 0L;
} else {
final QueueSize preFetchQueueSize = preFetch.size();
preFetchCount = preFetchQueueSize.getObjectCount();
preFetchSize = preFetchQueueSize.getByteCount();
}
return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount,
activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize);
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
readLock.lock(); readLock.lock();
try { try {
final PreFetch prefetch = preFetchRef.get(); return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
if (prefetch == null) {
return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
} else {
return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0;
}
} finally { } finally {
readLock.unlock("isEmpty"); readLock.unlock("isEmpty");
} }
@ -260,30 +242,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public boolean isActiveQueueEmpty() { public boolean isActiveQueueEmpty() {
final int activeQueueSize = activeQueueSizeRef.get(); final int activeQueueSize = activeQueueSizeRef.get();
if (activeQueueSize == 0) { return activeQueueSize == 0;
final PreFetch preFetch = preFetchRef.get();
if (preFetch == null) {
return true;
}
final QueueSize queueSize = preFetch.size();
return queueSize.getObjectCount() == 0;
} else {
return false;
}
} }
@Override
public QueueSize getActiveQueueSize() { public QueueSize getActiveQueueSize() {
readLock.lock(); readLock.lock();
try { try {
final PreFetch preFetch = preFetchRef.get(); return new QueueSize(activeQueue.size(), activeQueueContentSize);
if (preFetch == null) {
return new QueueSize(activeQueue.size(), activeQueueContentSize);
} else {
final QueueSize preFetchSize = preFetch.size();
return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount());
}
} finally { } finally {
readLock.unlock("getActiveQueueSize"); readLock.unlock("getActiveQueueSize");
} }
@ -374,6 +339,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swappedContentSize += file.getSize(); swappedContentSize += file.getSize();
swappedRecordCount++; swappedRecordCount++;
swapMode = true; swapMode = true;
writeSwapFilesIfNecessary();
} else { } else {
activeQueueContentSize += file.getSize(); activeQueueContentSize += file.getSize();
activeQueue.add(file); activeQueue.add(file);
@ -405,6 +371,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swappedContentSize += bytes; swappedContentSize += bytes;
swappedRecordCount += numFiles; swappedRecordCount += numFiles;
swapMode = true; swapMode = true;
writeSwapFilesIfNecessary();
} else { } else {
activeQueueContentSize += bytes; activeQueueContentSize += bytes;
activeQueue.addAll(files); activeQueue.addAll(files);
@ -421,116 +388,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
} }
@Override
public List<FlowFileRecord> pollSwappableRecords() {
writeLock.lock();
try {
if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
return null;
}
final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
final Iterator<FlowFileRecord> itr = swapQueue.iterator();
while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
final FlowFileRecord record = itr.next();
swapRecords.add(record);
itr.remove();
}
swapQueue.trimToSize();
return swapRecords;
} finally {
writeLock.unlock("pollSwappableRecords");
}
}
@Override
public void putSwappedRecords(final Collection<FlowFileRecord> records) {
writeLock.lock();
try {
try {
for (final FlowFileRecord record : records) {
swappedContentSize -= record.getSize();
swappedRecordCount--;
activeQueueContentSize += record.getSize();
activeQueue.add(record);
}
if (swappedRecordCount > swapQueue.size()) {
// we have more swap files to be swapped in.
return;
}
// If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix
if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
for (final FlowFileRecord record : swapQueue) {
activeQueue.add(record);
activeQueueContentSize += record.getSize();
}
swapQueue.clear();
swappedContentSize = 0L;
swappedRecordCount = 0;
swapMode = false;
}
} finally {
activeQueueSizeRef.set(activeQueue.size());
}
} finally {
writeLock.unlock("putSwappedRecords");
scheduler.registerEvent(connection.getDestination());
}
}
@Override
public void incrementSwapCount(final int numRecords, final long contentSize) {
writeLock.lock();
try {
swappedContentSize += contentSize;
swappedRecordCount += numRecords;
} finally {
writeLock.unlock("incrementSwapCount");
}
}
@Override
public int unswappedSize() {
readLock.lock();
try {
return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount();
} finally {
readLock.unlock("unswappedSize");
}
}
@Override
public int getSwapRecordCount() {
readLock.lock();
try {
return swappedRecordCount;
} finally {
readLock.unlock("getSwapRecordCount");
}
}
@Override
public int getSwapQueueSize() {
readLock.lock();
try {
if (logger.isDebugEnabled()) {
final long byteToMbDivisor = 1024L * 1024L;
final QueueSize unacknowledged = unacknowledgedSizeRef.get();
logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB",
activeQueue.size(), activeQueueContentSize / byteToMbDivisor,
swappedRecordCount, swappedContentSize / byteToMbDivisor,
unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor);
}
return swapQueue.size();
} finally {
readLock.unlock("getSwapQueueSize");
}
}
private boolean isLaterThan(final Long maxAge) { private boolean isLaterThan(final Long maxAge) {
if (maxAge == null) { if (maxAge == null) {
@ -558,30 +415,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// First check if we have any records Pre-Fetched. // First check if we have any records Pre-Fetched.
final long expirationMillis = flowFileExpirationMillis.get(); final long expirationMillis = flowFileExpirationMillis.get();
final PreFetch preFetch = preFetchRef.get();
if (preFetch != null) {
if (preFetch.isExpired()) {
requeueExpiredPrefetch(preFetch);
} else {
while (true) {
final FlowFileRecord next = preFetch.nextRecord();
if (next == null) {
break;
}
if (isLaterThan(getExpirationDate(next, expirationMillis))) {
expiredRecords.add(next);
continue;
}
updateUnacknowledgedSize(1, next.getSize());
return next;
}
preFetchRef.compareAndSet(preFetch, null);
}
}
writeLock.lock(); writeLock.lock();
try { try {
flowFile = doPoll(expiredRecords, expirationMillis); flowFile = doPoll(expiredRecords, expirationMillis);
@ -631,9 +464,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} }
if (incrementPollCount()) {
prefetch();
}
return isExpired ? null : flowFile; return isExpired ? null : flowFile;
} }
@ -642,38 +472,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults)); final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
// First check if we have any records Pre-Fetched. // First check if we have any records Pre-Fetched.
final long expirationMillis = flowFileExpirationMillis.get();
final PreFetch preFetch = preFetchRef.get();
if (preFetch != null) {
if (preFetch.isExpired()) {
requeueExpiredPrefetch(preFetch);
} else {
long totalSize = 0L;
for (int i = 0; i < maxResults; i++) {
final FlowFileRecord next = preFetch.nextRecord();
if (next == null) {
break;
}
if (isLaterThan(getExpirationDate(next, expirationMillis))) {
expiredRecords.add(next);
continue;
}
records.add(next);
totalSize += next.getSize();
}
// If anything was prefetched, use what we have.
if (!records.isEmpty()) {
updateUnacknowledgedSize(records.size(), totalSize);
return records;
}
preFetchRef.compareAndSet(preFetch, null);
}
}
writeLock.lock(); writeLock.lock();
try { try {
doPoll(records, maxResults, expiredRecords); doPoll(records, maxResults, expiredRecords);
@ -705,10 +503,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (queueFullAtStart && !expiredRecords.isEmpty()) { if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} }
if (incrementPollCount()) {
prefetch();
}
} }
/** /**
@ -732,6 +526,46 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
// to disk, because we want them to be swapped back in in the same order that they were swapped out. // to disk, because we want them to be swapped back in in the same order that they were swapped out.
if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
return;
}
// If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
// were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
// swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
// keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
// first.
if (!swapLocations.isEmpty()) {
final String swapLocation = swapLocations.remove(0);
try {
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this);
swappedRecordCount -= swappedIn.size();
long swapSize = 0L;
for (final FlowFileRecord flowFile : swappedIn) {
swapSize += flowFile.getSize();
}
swappedContentSize -= swapSize;
activeQueueContentSize += swapSize;
activeQueueSizeRef.set(activeQueue.size());
activeQueue.addAll(swappedIn);
return;
} catch (final FileNotFoundException fnfe) {
logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
}
return;
} catch (final IOException ioe) {
logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
logger.error("", ioe);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
}
return;
}
}
// this is the most common condition (nothing is swapped out), so do the check first and avoid the expense // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
// of other checks for 99.999% of the cases. // of other checks for 99.999% of the cases.
if (swappedRecordCount == 0 && swapQueue.isEmpty()) { if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
@ -760,6 +594,69 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
} }
/**
* This method MUST be called with the write lock held
*/
private void writeSwapFilesIfNecessary() {
if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
return;
}
final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
// Create a new Priority queue with the prioritizers that are set, but reverse the
// prioritizers because we want to pull the lowest-priority FlowFiles to swap out
final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
tempQueue.addAll(activeQueue);
tempQueue.addAll(swapQueue);
final List<String> swapLocations = new ArrayList<>(numSwapFiles);
for (int i = 0; i < numSwapFiles; i++) {
// Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
toSwap.add(tempQueue.poll());
}
try {
Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
final String swapLocation = swapManager.swapOut(toSwap, this);
swapLocations.add(swapLocation);
} catch (final IOException ioe) {
tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
+ "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString());
logger.error("", ioe);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() +
" queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
+ "See logs for more information.");
}
break;
}
}
// Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
// swap queue. Then add the records back to the active queue.
swapQueue.clear();
while (tempQueue.size() > swapThreshold) {
final FlowFileRecord record = tempQueue.poll();
swapQueue.add(record);
}
Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
// replace the contents of the active queue, since we've merged it with the swap queue.
activeQueue.clear();
FlowFileRecord toRequeue;
while ((toRequeue = tempQueue.poll()) != null) {
activeQueue.offer(toRequeue);
}
this.swapLocations.addAll(swapLocations);
}
@Override @Override
public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) { public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
long drainedSize = 0L; long drainedSize = 0L;
@ -796,13 +693,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>();
// the prefetch doesn't allow us to add records back. So when this method is used,
// if there are prefetched records, we have to requeue them into the active queue first.
final PreFetch prefetch = preFetchRef.get();
if (prefetch != null) {
requeueExpiredPrefetch(prefetch);
}
while (true) { while (true) {
FlowFileRecord flowFile = this.activeQueue.poll(); FlowFileRecord flowFile = this.activeQueue.poll();
if (flowFile == null) { if (flowFile == null) {
@ -856,6 +746,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
} }
private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable { private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -991,6 +883,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
this.swappedRecordCount = swapFlowFileCount; this.swappedRecordCount = swapFlowFileCount;
this.swappedContentSize = swapByteCount; this.swappedContentSize = swapByteCount;
this.swapLocations.addAll(swapLocations);
} finally { } finally {
writeLock.unlock("Recover Swap Files"); writeLock.unlock("Recover Swap Files");
} }
@ -1004,22 +897,190 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return "FlowFileQueue[id=" + identifier + "]"; return "FlowFileQueue[id=" + identifier + "]";
} }
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
@Override @Override
public DropFlowFileStatus dropFlowFiles() { public DropFlowFileStatus dropFlowFiles() {
// TODO Auto-generated method stub // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
return null; if (dropRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
final DropFlowFileRequest request = entry.getValue();
final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
toDrop.add(entry.getKey());
}
}
for (final String requestId : toDrop) {
dropRequestMap.remove(requestId);
}
}
// TODO: get user name!
final String userName = null;
final String requestIdentifier = UUID.randomUUID().toString();
final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
writeLock.lock();
try {
dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
drop(activeQueueRecords, userName);
activeQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
drop(swapQueue, userName);
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next();
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
try {
drop(swappedIn, userName);
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
}
dropRequest.setCurrentSize(getQueueSize());
swapLocationItr.remove();
}
dropRequest.setState(DropFlowFileState.COMPLETE);
} catch (final Exception e) {
// TODO: Handle adequately
dropRequest.setState(DropFlowFileState.FAILURE);
}
} finally {
writeLock.unlock("Drop FlowFiles");
}
}
}, "Drop FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
t.start();
dropRequest.setExecutionThread(t);
dropRequestMap.put(requestIdentifier, dropRequest);
return dropRequest;
}
private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException {
// Create a Provenance Event and a FlowFile Repository record for each FlowFile
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
for (final FlowFileRecord flowFile : flowFiles) {
provenanceEvents.add(createDropEvent(flowFile, user));
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
}
for (final FlowFileRecord flowFile : flowFiles) {
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) {
continue;
}
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
if (resourceClaim == null) {
continue;
}
resourceClaimManager.decrementClaimantCount(resourceClaim);
}
provRepository.registerEvents(provenanceEvents);
flowFileRepository.updateRepository(flowFileRepoRecords);
}
private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) {
final ProvenanceEventBuilder builder = provRepository.eventBuilder();
builder.fromFlowFile(flowFile);
builder.setEventType(ProvenanceEventType.DROP);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(getIdentifier());
builder.setComponentType("Connection");
builder.setDetails("FlowFile manually dropped by user " + user);
return builder.build();
}
private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
return new RepositoryRecord() {
@Override
public FlowFileQueue getDestination() {
return null;
}
@Override
public FlowFileQueue getOriginalQueue() {
return StandardFlowFileQueue.this;
}
@Override
public RepositoryRecordType getType() {
return RepositoryRecordType.DELETE;
}
@Override
public ContentClaim getCurrentClaim() {
return flowFile.getContentClaim();
}
@Override
public ContentClaim getOriginalClaim() {
return flowFile.getContentClaim();
}
@Override
public long getCurrentClaimOffset() {
return flowFile.getContentClaimOffset();
}
@Override
public FlowFileRecord getCurrent() {
return flowFile;
}
@Override
public boolean isAttributesChanged() {
return false;
}
@Override
public boolean isMarkedForAbort() {
return false;
}
@Override
public String getSwapLocation() {
return null;
}
};
}
@Override
public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
if (request == null) {
return false;
}
final boolean successful = request.cancel();
return successful;
} }
@Override @Override
public boolean cancelDropFlowFileRequest(String requestIdentifier) { public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
// TODO Auto-generated method stub return dropRequestMap.get(requestIdentifier);
return false;
}
@Override
public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
// TODO Auto-generated method stub
return null;
} }
/** /**
@ -1051,126 +1112,4 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
} while (!updated); } while (!updated);
} }
private void requeueExpiredPrefetch(final PreFetch prefetch) {
if (prefetch == null) {
return;
}
writeLock.lock();
try {
final long contentSizeRequeued = prefetch.requeue(activeQueue);
this.activeQueueContentSize += contentSizeRequeued;
this.preFetchRef.compareAndSet(prefetch, null);
} finally {
writeLock.unlock("requeueExpiredPrefetch");
}
}
/**
* MUST be called with write lock held.
*/
private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>();
private void prefetch() {
if (activeQueue.isEmpty()) {
return;
}
final int numToFetch = Math.min(prefetchSize, activeQueue.size());
final PreFetch curPreFetch = preFetchRef.get();
if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) {
return;
}
final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch);
long contentSize = 0L;
for (int i = 0; i < numToFetch; i++) {
final FlowFileRecord record = activeQueue.poll();
if (record == null || record.isPenalized()) {
// not enough unpenalized records to pull. Put all records back and return
activeQueue.addAll(buffer);
if (record != null) {
activeQueue.add(record);
}
return;
} else {
buffer.add(record);
contentSize += record.getSize();
}
}
activeQueueContentSize -= contentSize;
preFetchRef.set(new PreFetch(buffer));
}
private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess());
private boolean incrementPollCount() {
pollCounts.add(new TimestampedLong(1L));
final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue();
return totalCount > PREFETCH_POLL_THRESHOLD * 5;
}
private static class PreFetch {
private final List<FlowFileRecord> records;
private final AtomicInteger pointer = new AtomicInteger(0);
private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
private final AtomicLong contentSize = new AtomicLong(0L);
public PreFetch(final List<FlowFileRecord> records) {
this.records = records;
long totalSize = 0L;
for (final FlowFileRecord record : records) {
totalSize += record.getSize();
}
contentSize.set(totalSize);
}
public FlowFileRecord nextRecord() {
final int nextValue = pointer.getAndIncrement();
if (nextValue >= records.size()) {
return null;
}
final FlowFileRecord flowFile = records.get(nextValue);
contentSize.addAndGet(-flowFile.getSize());
return flowFile;
}
public QueueSize size() {
final int pointerIndex = pointer.get();
final int count = records.size() - pointerIndex;
if (count < 0) {
return new QueueSize(0, 0L);
}
final long bytes = contentSize.get();
return new QueueSize(count, bytes);
}
public boolean isExpired() {
return System.nanoTime() > expirationTime;
}
private long requeue(final Queue<FlowFileRecord> queue) {
// get the current pointer and prevent any other thread from accessing the rest of the elements
final int curPointer = pointer.getAndAdd(records.size());
if (curPointer < records.size() - 1) {
final List<FlowFileRecord> subList = records.subList(curPointer, records.size());
long contentSize = 0L;
for (final FlowFileRecord record : subList) {
contentSize += record.getSize();
}
queue.addAll(subList);
return contentSize;
}
return 0L;
}
}
} }

View File

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
@Before
public void setup() {
final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
swapManager = new TestSwapManager();
final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L);
}
@Test
public void testSwapOutOccurs() {
for (int i = 0; i < 10000; i++) {
queue.put(new TestFlowFile());
assertEquals(0, swapManager.swapOutCalledCount);
assertEquals(i + 1, queue.size().getObjectCount());
assertEquals(i + 1, queue.size().getByteCount());
}
for (int i = 0; i < 9999; i++) {
queue.put(new TestFlowFile());
assertEquals(0, swapManager.swapOutCalledCount);
assertEquals(i + 10001, queue.size().getObjectCount());
assertEquals(i + 10001, queue.size().getByteCount());
}
queue.put(new TestFlowFile(1000));
assertEquals(1, swapManager.swapOutCalledCount);
assertEquals(20000, queue.size().getObjectCount());
assertEquals(20999, queue.size().getByteCount());
assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
}
@Test
public void testLowestPrioritySwappedOutFirst() {
final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
prioritizers.add(new FlowFileSizePrioritizer());
queue.setPriorities(prioritizers);
long maxSize = 20000;
for (int i = 1; i <= 20000; i++) {
queue.put(new TestFlowFile(maxSize - i));
}
assertEquals(1, swapManager.swapOutCalledCount);
assertEquals(20000, queue.size().getObjectCount());
assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
assertEquals(10000, flowFiles.size());
for (int i = 0; i < 10000; i++) {
assertEquals(i, flowFiles.get(i).getSize());
}
}
@Test
public void testSwapIn() {
for (int i = 1; i <= 20000; i++) {
queue.put(new TestFlowFile());
}
assertEquals(1, swapManager.swappedOut.size());
queue.put(new TestFlowFile());
assertEquals(1, swapManager.swappedOut.size());
final Set<FlowFileRecord> exp = new HashSet<>();
for (int i = 0; i < 9999; i++) {
assertNotNull(queue.poll(exp));
}
assertEquals(0, swapManager.swapInCalledCount);
assertEquals(1, queue.getActiveQueueSize().getObjectCount());
assertNotNull(queue.poll(exp));
assertEquals(0, swapManager.swapInCalledCount);
assertEquals(0, queue.getActiveQueueSize().getObjectCount());
assertEquals(1, swapManager.swapOutCalledCount);
assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
assertEquals(1, swapManager.swapInCalledCount);
assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
queue.poll(exp);
}
private class TestSwapManager implements FlowFileSwapManager {
private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
int swapOutCalledCount = 0;
int swapInCalledCount = 0;
@Override
public void initialize(final SwapManagerInitializationContext initializationContext) {
}
@Override
public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
swapOutCalledCount++;
final String location = UUID.randomUUID().toString();
swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
return location;
}
@Override
public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
}
@Override
public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
swapInCalledCount++;
return swappedOut.remove(swapLocation);
}
@Override
public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
return new ArrayList<String>(swappedOut.keySet());
}
@Override
public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
}
@Override
public QueueSize getSwapSize(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
return new QueueSize(0, 0L);
}
int count = 0;
long size = 0L;
for (final FlowFileRecord flowFile : flowFiles) {
count++;
size += flowFile.getSize();
}
return new QueueSize(count, size);
}
@Override
public Long getMaxRecordId(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
if (flowFiles == null) {
return null;
}
Long max = null;
for (final FlowFileRecord flowFile : flowFiles) {
if (max == null || flowFile.getId() > max) {
max = flowFile.getId();
}
}
return max;
}
@Override
public void purge() {
swappedOut.clear();
}
}
private static class TestFlowFile implements FlowFileRecord {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private final long id = idGenerator.getAndIncrement();
private final long entryDate = System.currentTimeMillis();
private final Map<String, String> attributes;
private final long size;
public TestFlowFile() {
this(1L);
}
public TestFlowFile(final long size) {
this(new HashMap<String, String>(), size);
}
public TestFlowFile(final Map<String, String> attributes, final long size) {
this.attributes = attributes;
this.size = size;
}
@Override
public long getId() {
return id;
}
@Override
public long getEntryDate() {
return entryDate;
}
@Override
public long getLineageStartDate() {
return entryDate;
}
@Override
public Long getLastQueueDate() {
return null;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.emptySet();
}
@Override
public boolean isPenalized() {
return false;
}
@Override
public String getAttribute(String key) {
return attributes.get(key);
}
@Override
public long getSize() {
return size;
}
@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
@Override
public int compareTo(final FlowFile o) {
return Long.compare(id, o.getId());
}
@Override
public long getPenaltyExpirationMillis() {
return 0;
}
@Override
public ContentClaim getContentClaim() {
return null;
}
@Override
public long getContentClaimOffset() {
return 0;
}
}
private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
return Long.compare(o1.getSize(), o2.getSize());
}
}
}

View File

@ -32,11 +32,14 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
/** /**
@ -66,7 +69,8 @@ public final class StandardConnection implements Connection {
destination = new AtomicReference<>(builder.destination); destination = new AtomicReference<>(builder.destination);
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler; scheduler = builder.scheduler;
flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
} }
@ -262,6 +266,9 @@ public final class StandardConnection implements Connection {
private Collection<Relationship> relationships; private Collection<Relationship> relationships;
private FlowFileSwapManager swapManager; private FlowFileSwapManager swapManager;
private EventReporter eventReporter; private EventReporter eventReporter;
private FlowFileRepository flowFileRepository;
private ProvenanceEventRepository provenanceRepository;
private ResourceClaimManager resourceClaimManager;
public Builder(final ProcessScheduler scheduler) { public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler; this.scheduler = scheduler;
@ -318,6 +325,21 @@ public final class StandardConnection implements Connection {
return this; return this;
} }
public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
this.flowFileRepository = flowFileRepository;
return this;
}
public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
this.provenanceRepository = provenanceRepository;
return this;
}
public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
this.resourceClaimManager = resourceClaimManager;
return this;
}
public StandardConnection build() { public StandardConnection build() {
if (source == null) { if (source == null) {
throw new IllegalStateException("Cannot build a Connection without a Source"); throw new IllegalStateException("Cannot build a Connection without a Source");
@ -328,6 +350,15 @@ public final class StandardConnection implements Connection {
if (swapManager == null) { if (swapManager == null) {
throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager"); throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
} }
if (flowFileRepository == null) {
throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
}
if (provenanceRepository == null) {
throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
}
if (resourceClaimManager == null) {
throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
}
if (relationships == null) { if (relationships == null) {
relationships = new ArrayList<>(); relationships = new ArrayList<>();

View File

@ -28,6 +28,7 @@ import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -79,7 +80,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private EventReporter eventReporter; private EventReporter eventReporter;
private ResourceClaimManager claimManager; private ResourceClaimManager claimManager;
public FileSystemSwapManager() { public FileSystemSwapManager() {
final NiFiProperties properties = NiFiProperties.getInstance(); final NiFiProperties properties = NiFiProperties.getInstance();
final Path flowFileRepoPath = properties.getFlowFileRepositoryPath(); final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
@ -111,6 +111,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) { try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
fos.getFD().sync(); fos.getFD().sync();
} catch (final IOException ioe) {
// we failed to write out the entire swap file. Delete the temporary file, if we can.
swapTempFile.delete();
throw ioe;
} }
if (swapTempFile.renameTo(swapFile)) { if (swapTempFile.renameTo(swapFile)) {
@ -133,25 +137,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"); warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
} }
// TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account:
/*
* } catch (final EOFException eof) {
* error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
*
* if (!swapFile.delete()) {
* warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
* }
* } catch (final FileNotFoundException fnfe) {
* error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
* } catch (final Exception e) {
* error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
*
* if (swapFile != null) {
* queue.add(swapFile);
* }
* }
*/
return swappedFlowFiles; return swappedFlowFiles;
} }
@ -165,7 +150,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final List<FlowFileRecord> swappedFlowFiles; final List<FlowFileRecord> swappedFlowFiles;
try (final InputStream fis = new FileInputStream(swapFile); try (final InputStream fis = new FileInputStream(swapFile);
final DataInputStream in = new DataInputStream(fis)) { final DataInputStream in = new DataInputStream(fis)) {
swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager); swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
} }
return swappedFlowFiles; return swappedFlowFiles;
@ -188,6 +173,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
@Override
public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
}
@Override @Override
public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() { final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@ -322,7 +313,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException { public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
if (toSwap == null || toSwap.isEmpty()) { if (toSwap == null || toSwap.isEmpty()) {
return 0; return 0;
} }
@ -396,8 +387,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
return toSwap.size(); return toSwap.size();
} }
private void writeString(final String toWrite, final OutputStream out) throws IOException { private static void writeString(final String toWrite, final OutputStream out) throws IOException {
final byte[] bytes = toWrite.getBytes("UTF-8"); final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
final int utflen = bytes.length; final int utflen = bytes.length;
if (utflen < 65535) { if (utflen < 65535) {
@ -415,26 +406,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException { static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final int swapEncodingVersion = in.readInt(); final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) { if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
} }
final String connectionId = in.readUTF(); final String connectionId = in.readUTF(); // Connection ID
if (!connectionId.equals(queue.getIdentifier())) { if (!connectionId.equals(queue.getIdentifier())) {
throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation + throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation +
" because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue); " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
} }
final int numRecords = in.readInt(); final int numRecords = in.readInt();
in.readLong(); // Content Size in.readLong(); // Content Size
if (swapEncodingVersion > 7) {
in.readLong(); // Max Record ID
}
return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager); return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
} }
static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException { final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
final List<FlowFileRecord> flowFiles = new ArrayList<>(); final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < numFlowFiles; i++) { for (int i = 0; i < numFlowFiles; i++) {
@ -543,7 +537,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
final byte[] bytes = new byte[numBytes]; final byte[] bytes = new byte[numBytes];
fillBuffer(in, bytes, numBytes); fillBuffer(in, bytes, numBytes);
return new String(bytes, "UTF-8"); return new String(bytes, StandardCharsets.UTF_8);
} }
private static Integer readFieldLength(final InputStream in) throws IOException { private static Integer readFieldLength(final InputStream in) throws IOException {

View File

@ -286,7 +286,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final NodeProtocolSender protocolSender; private final NodeProtocolSender protocolSender;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager(); private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
// guarded by rwLock // guarded by rwLock
/** /**
@ -393,7 +393,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager);
flowFileRepository = flowFileRepo; flowFileRepository = flowFileRepo;
flowFileEventRepository = flowFileEventRepo; flowFileEventRepository = flowFileEventRepo;
counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
@ -668,7 +668,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try { try {
final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class); final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
synchronized (contentRepo) { synchronized (contentRepo) {
contentRepo.initialize(contentClaimManager); contentRepo.initialize(resourceClaimManager);
} }
return contentRepo; return contentRepo;
} catch (final Exception e) { } catch (final Exception e) {
@ -728,11 +728,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Create and initialize a FlowFileSwapManager for this connection // Create and initialize a FlowFileSwapManager for this connection
final FlowFileSwapManager swapManager = createSwapManager(properties); final FlowFileSwapManager swapManager = createSwapManager(properties);
final EventReporter eventReporter = createEventReporter(getBulletinRepository()); final EventReporter eventReporter = createEventReporter(getBulletinRepository());
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() { final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
@Override @Override
public ResourceClaimManager getResourceClaimManager() { public ResourceClaimManager getResourceClaimManager() {
return getResourceClaimManager(); return resourceClaimManager;
} }
@Override @Override
@ -756,6 +757,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.destination(destination) .destination(destination)
.swapManager(swapManager) .swapManager(swapManager)
.eventReporter(eventReporter) .eventReporter(eventReporter)
.resourceClaimManager(resourceClaimManager)
.flowFileRepository(flowFileRepository)
.provenanceRepository(provenanceEventRepository)
.build(); .build();
} }
@ -3188,7 +3192,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalArgumentException("Input Content Claim not specified"); throw new IllegalArgumentException("Input Content Claim not specified");
} }
final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
provEvent.getPreviousContentClaimIdentifier(), false); provEvent.getPreviousContentClaimIdentifier(), false);
claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
@ -3198,7 +3202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalArgumentException("Output Content Claim not specified"); throw new IllegalArgumentException("Output Content Claim not specified");
} }
final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
provEvent.getContentClaimIdentifier(), false); provEvent.getContentClaimIdentifier(), false);
claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
@ -3247,7 +3251,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} }
try { try {
final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset()); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
if (!contentRepository.isAccessible(contentClaim)) { if (!contentRepository.isAccessible(contentClaim)) {
@ -3327,17 +3331,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} }
// Create the ContentClaim // Create the ContentClaim
final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim // Increment Claimant Count, since we will now be referencing the Content Claim
contentClaimManager.incrementClaimantCount(resourceClaim); resourceClaimManager.incrementClaimantCount(resourceClaim);
final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue(); final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset); final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize()); contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
if (!contentRepository.isAccessible(contentClaim)) { if (!contentRepository.isAccessible(contentClaim)) {
contentClaimManager.decrementClaimantCount(resourceClaim); resourceClaimManager.decrementClaimantCount(resourceClaim);
throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository"); throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
} }

View File

@ -81,9 +81,11 @@ import org.slf4j.LoggerFactory;
* <p> * <p>
* Provides a ProcessSession that ensures all accesses, changes and transfers * Provides a ProcessSession that ensures all accesses, changes and transfers
* occur in an atomic manner for all FlowFiles including their contents and * occur in an atomic manner for all FlowFiles including their contents and
* attributes</p> * attributes
* </p>
* <p> * <p>
* NOT THREAD SAFE</p> * NOT THREAD SAFE
* </p>
* <p/> * <p/>
*/ */
public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher { public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
@ -104,7 +106,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<String, Long> globalCounters = new HashMap<>(); private final Map<String, Long> globalCounters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>(); private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context; private final ProcessContext context;
private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
private final Set<Path> deleteOnCommit = new HashSet<>(); private final Set<Path> deleteOnCommit = new HashSet<>();
private final long sessionId; private final long sessionId;
private final String connectableDescription; private final String connectableDescription;
@ -114,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final StandardProvenanceReporter provenanceReporter; private final StandardProvenanceReporter provenanceReporter;
private int removedCount = 0; // number of flowfiles removed in this session private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session
private final LongHolder bytesRead = new LongHolder(0L); private final LongHolder bytesRead = new LongHolder(0L);
private final LongHolder bytesWritten = new LongHolder(0L); private final LongHolder bytesWritten = new LongHolder(0L);
@ -169,7 +171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType, this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
context.getProvenanceRepository(), this); context.getProvenanceRepository(), this);
this.sessionId = idGenerator.getAndIncrement(); this.sessionId = idGenerator.getAndIncrement();
this.connectableDescription = description; this.connectableDescription = description;
@ -196,7 +198,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// Processor-reported events. // Processor-reported events.
List<ProvenanceEventRecord> autoTerminatedEvents = null; List<ProvenanceEventRecord> autoTerminatedEvents = null;
//validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>(); final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
for (final StandardRepositoryRecord record : records.values()) { for (final StandardRepositoryRecord record : records.values()) {
if (record.isMarkedForDelete()) { if (record.isMarkedForDelete()) {
@ -235,11 +237,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
} }
} else { } else {
final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
record.setDestination(finalDestination.getFlowFileQueue()); record.setDestination(finalDestination.getFlowFileQueue());
incrementConnectionInputCounts(finalDestination, record); incrementConnectionInputCounts(finalDestination, record);
for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
incrementConnectionInputCounts(destination, record); incrementConnectionInputCounts(destination, record);
final FlowFileRecord currRec = record.getCurrent(); final FlowFileRecord currRec = record.getCurrent();
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec); final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
@ -256,7 +258,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (claim != null) { if (claim != null) {
context.getContentRepository().incrementClaimaintCount(claim); context.getContentRepository().incrementClaimaintCount(claim);
} }
newRecord.setWorking(clone, Collections.<String, String>emptyMap()); newRecord.setWorking(clone, Collections.<String, String> emptyMap());
newRecord.setDestination(destination.getFlowFileQueue()); newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship()); newRecord.setTransferRelationship(record.getTransferRelationship());
@ -322,9 +324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable(); final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
//records which have been updated - remove original if exists // records which have been updated - remove original if exists
removeContent(record.getOriginalClaim()); removeContent(record.getOriginalClaim());
} }
} }
@ -356,7 +358,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>(); final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) { for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) { if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
continue; //these don't need to be transferred continue; // these don't need to be transferred
} }
// record.getCurrent() will return null if this record was created in this session -- // record.getCurrent() will return null if this record was created in this session --
// in this case, we just ignore it, and it will be cleaned up by clearing the records map. // in this case, we just ignore it, and it will be cleaned up by clearing the records map.
@ -390,7 +392,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint); final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) { if (!sessionSummary.isEmpty()) {
LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary}); LOG.info("{} for {}, committed the following events: {}", new Object[] {this, connectableDescription, sessionSummary});
} }
} }
@ -611,9 +613,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean creationEventRegistered = false; boolean creationEventRegistered = false;
if (registeredTypes != null) { if (registeredTypes != null) {
if (registeredTypes.contains(ProvenanceEventType.CREATE) if (registeredTypes.contains(ProvenanceEventType.CREATE)
|| registeredTypes.contains(ProvenanceEventType.FORK) || registeredTypes.contains(ProvenanceEventType.FORK)
|| registeredTypes.contains(ProvenanceEventType.JOIN) || registeredTypes.contains(ProvenanceEventType.JOIN)
|| registeredTypes.contains(ProvenanceEventType.RECEIVE)) { || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
creationEventRegistered = true; creationEventRegistered = true;
} }
} }
@ -747,7 +749,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
private StandardProvenanceEventRecord enrich( private StandardProvenanceEventRecord enrich(
final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) { final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent); final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid()); final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) { if (eventFlowFile != null) {
@ -1039,7 +1041,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StringBuilder sb = new StringBuilder(512); final StringBuilder sb = new StringBuilder(512);
if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
|| numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) { || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
if (numCreated > 0) { if (numCreated > 0) {
sb.append("created ").append(numCreated).append(" FlowFiles, "); sb.append("created ").append(numCreated).append(" FlowFiles, ");
} }
@ -1097,7 +1099,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void formatNanos(final long nanos, final StringBuilder sb) { private void formatNanos(final long nanos, final StringBuilder sb) {
final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L; final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
long millis = nanos > 1000000L ? nanos / 1000000L : 0L;; long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
;
final long nanosLeft = nanos % 1000000L; final long nanosLeft = nanos % 1000000L;
if (seconds > 0) { if (seconds > 0) {
@ -1272,7 +1275,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
int flowFileCount = 0; int flowFileCount = 0;
long byteCount = 0L; long byteCount = 0L;
for (final Connection conn : context.getPollableConnections()) { for (final Connection conn : context.getPollableConnections()) {
final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize(); final QueueSize queueSize = conn.getFlowFileQueue().size();
flowFileCount += queueSize.getObjectCount(); flowFileCount += queueSize.getObjectCount();
byteCount += queueSize.getByteCount(); byteCount += queueSize.getByteCount();
} }
@ -1287,8 +1290,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(attrs) .addAttributes(attrs)
.build(); .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null); final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs); record.setWorking(fFile, attrs);
records.put(fFile, record); records.put(fFile, record);
@ -1324,7 +1327,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
context.getContentRepository().incrementClaimaintCount(claim); context.getContentRepository().incrementClaimaintCount(claim);
} }
final StandardRepositoryRecord record = new StandardRepositoryRecord(null); final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(clone, Collections.<String, String>emptyMap()); record.setWorking(clone, Collections.<String, String> emptyMap());
records.put(clone, record); records.put(clone, record);
if (offset == 0L && size == example.getSize()) { if (offset == 0L && size == example.getSize()) {
@ -1637,7 +1640,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return; return;
} }
LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()}); LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()});
final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size()); final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
final String processorType; final String processorType;
@ -1650,7 +1653,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
processorType, context.getProvenanceRepository(), this); processorType, context.getProvenanceRepository(), this);
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
for (final FlowFileRecord flowFile : flowFiles) { for (final FlowFileRecord flowFile : flowFiles) {
@ -1664,7 +1667,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
} }
try { try {
@ -1696,7 +1699,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
record.getContentClaimOffset() + claim.getOffset(), record.getSize()); record.getContentClaimOffset() + claim.getOffset(), record.getSize());
} }
enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); enriched.setAttributes(record.getAttributes(), Collections.<String, String> emptyMap());
return enriched.build(); return enriched.build();
} }
@ -1780,9 +1783,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} }
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@ -1853,7 +1856,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try { try {
try (final OutputStream rawOut = contentRepo.write(newClaim); try (final OutputStream rawOut = contentRepo.write(newClaim);
final OutputStream out = new BufferedOutputStream(rawOut)) { final OutputStream out = new BufferedOutputStream(rawOut)) {
if (header != null && header.length > 0) { if (header != null && header.length > 0) {
out.write(header); out.write(header);
@ -2070,10 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// the original claim if the record is "working" but the content has not been modified // the original claim if the record is "working" but the content has not been modified
// (e.g., in the case of attributes only were updated) // (e.g., in the case of attributes only were updated)
// In other words: // In other words:
// If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
// return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
// because we will do that later, in the session.commit() and that would result in removing the original claim twice. // because we will do that later, in the session.commit() and that would result in removing the original claim twice.
if (contentModified) { if (contentModified) {
// In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be // In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim). // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
@ -2196,7 +2199,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override @Override
public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) { public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
validateRecordState(destination); validateRecordState(destination);
//TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true. // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) { if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
// If we do NOT want to keep the file, ensure that we can delete it, or else error. // If we do NOT want to keep the file, ensure that we can delete it, or else error.
throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import."); throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
@ -2228,9 +2231,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeTemporaryClaim(record); removeTemporaryClaim(record);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
.addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
.build(); .build();
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
if (!keepSourceFile) { if (!keepSourceFile) {
deleteOnCommit.add(source); deleteOnCommit.add(source);
@ -2370,7 +2373,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
* *
* @param flowFile the FlowFile to check * @param flowFile the FlowFile to check
* @return <code>true</code> if the FlowFile is known in this session, * @return <code>true</code> if the FlowFile is known in this session,
* <code>false</code> otherwise. * <code>false</code> otherwise.
*/ */
boolean isFlowFileKnown(final FlowFile flowFile) { boolean isFlowFileKnown(final FlowFile flowFile) {
return records.containsKey(flowFile); return records.containsKey(flowFile);
@ -2392,8 +2395,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final String key = entry.getKey(); final String key = entry.getKey();
final String value = entry.getValue(); final String value = entry.getValue();
if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
|| CoreAttributes.DISCARD_REASON.key().equals(key) || CoreAttributes.DISCARD_REASON.key().equals(key)
|| CoreAttributes.UUID.key().equals(key)) { || CoreAttributes.UUID.key().equals(key)) {
continue; continue;
} }
newAttributes.put(key, value); newAttributes.put(key, value);
@ -2441,10 +2444,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence()) final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes) .addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers) .lineageIdentifiers(lineageIdentifiers)
.lineageStartDate(lineageStartDate) .lineageStartDate(lineageStartDate)
.build(); .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null); final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes); record.setWorking(fFile, newAttributes);
@ -2465,7 +2468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
*/ */
private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
final Map<String, String> result = new HashMap<>(); final Map<String, String> result = new HashMap<>();
//trivial cases // trivial cases
if (flowFileList == null || flowFileList.isEmpty()) { if (flowFileList == null || flowFileList.isEmpty()) {
return result; return result;
} else if (flowFileList.size() == 1) { } else if (flowFileList.size() == 1) {
@ -2478,8 +2481,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
*/ */
final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
outer: outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey(); final String key = mapEntry.getKey();
final String value = mapEntry.getValue(); final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) { for (final FlowFile flowFile : flowFileList) {
@ -2539,7 +2541,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Set<String> removedFlowFiles = new HashSet<>(); private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>(); private final Set<String> createdFlowFiles = new HashSet<>();
private int removedCount = 0; // number of flowfiles removed in this session private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session
private long bytesRead = 0L; private long bytesRead = 0L;
private long bytesWritten = 0L; private long bytesWritten = 0L;

View File

@ -26,7 +26,7 @@ public class Connectables {
public static boolean flowFilesQueued(final Connectable connectable) { public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) { for (final Connection conn : connectable.getIncomingConnections()) {
if (!conn.getFlowFileQueue().isActiveQueueEmpty()) { if (!conn.getFlowFileQueue().isEmpty()) {
return true; return true;
} }
} }

View File

@ -22,16 +22,26 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -47,7 +57,7 @@ public class TestFileSystemSwapManager {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4"); Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager()); final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
assertEquals(10000, records.size()); assertEquals(10000, records.size());
for (final FlowFileRecord record : records) { for (final FlowFileRecord record : records) {
@ -57,6 +67,53 @@ public class TestFileSystemSwapManager {
} }
} }
@Test
public void testRoundTripSerializeDeserialize() throws IOException {
final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
final Map<String, String> attrs = new HashMap<>();
for (int i = 0; i < 10000; i++) {
attrs.put("i", String.valueOf(i));
final FlowFileRecord ff = new TestFlowFile(attrs, i);
toSwap.add(ff);
}
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final String swapLocation = "target/testRoundTrip.swap";
final File swapFile = new File(swapLocation);
Files.deleteIfExists(swapFile.toPath());
try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
}
final List<FlowFileRecord> swappedIn;
try (final FileInputStream fis = new FileInputStream(swapFile);
final DataInputStream dis = new DataInputStream(fis)) {
swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
}
assertEquals(toSwap.size(), swappedIn.size());
for (int i = 0; i < toSwap.size(); i++) {
final FlowFileRecord pre = toSwap.get(i);
final FlowFileRecord post = swappedIn.get(i);
assertEquals(pre.getSize(), post.getSize());
assertEquals(pre.getAttributes(), post.getAttributes());
assertEquals(pre.getSize(), post.getSize());
assertEquals(pre.getId(), post.getId());
assertEquals(pre.getContentClaim(), post.getContentClaim());
assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
assertEquals(pre.getEntryDate(), post.getEntryDate());
assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers());
assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
}
}
public class NopResourceClaimManager implements ResourceClaimManager { public class NopResourceClaimManager implements ResourceClaimManager {
@Override @Override
@ -100,4 +157,87 @@ public class TestFileSystemSwapManager {
public void purge() { public void purge() {
} }
} }
private static class TestFlowFile implements FlowFileRecord {
private static final AtomicLong idGenerator = new AtomicLong(0L);
private final long id = idGenerator.getAndIncrement();
private final long entryDate = System.currentTimeMillis();
private final long lastQueueDate = System.currentTimeMillis();
private final Map<String, String> attributes;
private final long size;
public TestFlowFile(final Map<String, String> attributes, final long size) {
this.attributes = attributes;
this.size = size;
}
@Override
public long getId() {
return id;
}
@Override
public long getEntryDate() {
return entryDate;
}
@Override
public long getLineageStartDate() {
return entryDate;
}
@Override
public Long getLastQueueDate() {
return lastQueueDate;
}
@Override
public Set<String> getLineageIdentifiers() {
return Collections.emptySet();
}
@Override
public boolean isPenalized() {
return false;
}
@Override
public String getAttribute(String key) {
return attributes.get(key);
}
@Override
public long getSize() {
return size;
}
@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
@Override
public int compareTo(final FlowFile o) {
return Long.compare(id, o.getId());
}
@Override
public long getPenaltyExpirationMillis() {
return -1L;
}
@Override
public ContentClaim getContentClaim() {
return null;
}
@Override
public long getContentClaimOffset() {
return 0;
}
}
} }

View File

@ -134,7 +134,7 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000); flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() { Mockito.doAnswer(new Answer<Object>() {

View File

@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
@ -47,9 +52,10 @@ import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException; import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -61,8 +67,8 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
@ -75,7 +81,9 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.search.SearchTerms; import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.search.SearchContext; import org.apache.nifi.search.SearchContext;
@ -85,6 +93,7 @@ import org.apache.nifi.services.FlowService;
import org.apache.nifi.user.NiFiUser; import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO; import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
@ -104,15 +113,6 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.DownloadableContent;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.security.user.NiFiUserUtils; import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -501,7 +501,7 @@ public class ControllerFacade {
* Site-to-Site communications * Site-to-Site communications
* *
* @return the socket port that the Cluster Manager is listening on for * @return the socket port that the Cluster Manager is listening on for
* Site-to-Site communications * Site-to-Site communications
*/ */
public Integer getClusterManagerRemoteSiteListeningPort() { public Integer getClusterManagerRemoteSiteListeningPort() {
return flowController.getClusterManagerRemoteSiteListeningPort(); return flowController.getClusterManagerRemoteSiteListeningPort();
@ -512,7 +512,7 @@ public class ControllerFacade {
* Manager are secure * Manager are secure
* *
* @return whether or not Site-to-Site communications with the Cluster * @return whether or not Site-to-Site communications with the Cluster
* Manager are secure * Manager are secure
*/ */
public Boolean isClusterManagerRemoteSiteCommsSecure() { public Boolean isClusterManagerRemoteSiteCommsSecure() {
return flowController.isClusterManagerRemoteSiteCommsSecure(); return flowController.isClusterManagerRemoteSiteCommsSecure();
@ -523,7 +523,7 @@ public class ControllerFacade {
* Site-to-Site communications * Site-to-Site communications
* *
* @return the socket port that the local instance is listening on for * @return the socket port that the local instance is listening on for
* Site-to-Site communications * Site-to-Site communications
*/ */
public Integer getRemoteSiteListeningPort() { public Integer getRemoteSiteListeningPort() {
return flowController.getRemoteSiteListeningPort(); return flowController.getRemoteSiteListeningPort();
@ -534,7 +534,7 @@ public class ControllerFacade {
* instance are secure * instance are secure
* *
* @return whether or not Site-to-Site communications with the local * @return whether or not Site-to-Site communications with the local
* instance are secure * instance are secure
*/ */
public Boolean isRemoteSiteCommsSecure() { public Boolean isRemoteSiteCommsSecure() {
return flowController.isRemoteSiteCommsSecure(); return flowController.isRemoteSiteCommsSecure();