mirror of https://github.com/apache/nifi.git
NIFI-6285: Addressed issue that resulted in swapped data not being swapped back in if load balancing strategy changed while data was swapped out; added integration tests for swapping. In testing, also encountered an issue with data being swapped out while swap files were being recovered causing the queue size to be wrong and causing errors about not being able to swap data in, because it attempted to swap the data in twice.
This closes #3473. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
595835f6ee
commit
f08c2ee43f
|
@ -463,6 +463,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
// Use Files.move and convert to Path's instead of File.rename so that we get an IOException on failure that describes why we failed.
|
||||
Files.move(existingFile.toPath(), newFile.toPath());
|
||||
|
||||
logger.debug("Changed Partition for Swap File by renaming from {} to {}", swapLocation, newPartitionName);
|
||||
return newFile.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2650,6 +2650,10 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
return replayEvent;
|
||||
}
|
||||
|
||||
public ResourceClaimManager getResourceClaimManager() {
|
||||
return resourceClaimManager;
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
rwLock.readLock().lock();
|
||||
try {
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
|
@ -195,6 +196,8 @@ public class SwappablePriorityQueue {
|
|||
final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName);
|
||||
swapLocations.add(swapLocation);
|
||||
|
||||
logger.debug("Successfully wrote out Swap File {} containing {} FlowFiles ({} bytes)", swapLocation, toSwap.size(), bytesSwappedThisIteration);
|
||||
|
||||
bytesSwappedOut += bytesSwappedThisIteration;
|
||||
flowFilesSwappedOut += toSwap.size();
|
||||
} catch (final IOException ioe) {
|
||||
|
@ -255,6 +258,7 @@ public class SwappablePriorityQueue {
|
|||
}
|
||||
|
||||
this.swapLocations.addAll(swapLocations);
|
||||
logger.debug("After writing swap files, setting new set of Swap Locations to {}", this.swapLocations);
|
||||
}
|
||||
|
||||
private int getFlowFileCount() {
|
||||
|
@ -337,6 +341,7 @@ public class SwappablePriorityQueue {
|
|||
boolean partialContents = false;
|
||||
SwapContents swapContents;
|
||||
try {
|
||||
logger.debug("Attempting to swap in {}; all swap locations = {}", swapLocation, swapLocations);
|
||||
swapContents = swapManager.swapIn(swapLocation, flowFileQueue);
|
||||
swapLocations.remove(0);
|
||||
} catch (final IncompleteSwapFileException isfe) {
|
||||
|
@ -391,7 +396,7 @@ public class SwappablePriorityQueue {
|
|||
} else {
|
||||
// we swapped in the whole swap file. We can just use the info that we got from the summary.
|
||||
incrementActiveQueueSize(flowFileCount, contentSize);
|
||||
logger.debug("Successfully swapped in Swap File {}", swapLocation);
|
||||
logger.debug("Successfully swapped in Swap File {} containing {} FlowFiles ({} bytes)", swapLocation, flowFileCount, contentSize);
|
||||
}
|
||||
|
||||
activeQueue.addAll(swapContents.getFlowFiles());
|
||||
|
@ -411,12 +416,12 @@ public class SwappablePriorityQueue {
|
|||
}
|
||||
|
||||
public void acknowledge(final FlowFileRecord flowFile) {
|
||||
logger.debug("{} Acknowledging {}", this, flowFile);
|
||||
logger.trace("{} Acknowledging {}", this, flowFile);
|
||||
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
|
||||
}
|
||||
|
||||
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
|
||||
logger.debug("{} Acknowledging {}", this, flowFiles);
|
||||
logger.trace("{} Acknowledging {}", this, flowFiles);
|
||||
final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
|
||||
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
|
||||
}
|
||||
|
@ -435,7 +440,7 @@ public class SwappablePriorityQueue {
|
|||
activeQueue.add(flowFile);
|
||||
}
|
||||
|
||||
logger.debug("{} put to {}", flowFile, this);
|
||||
logger.trace("{} put to {}", flowFile, this);
|
||||
} finally {
|
||||
writeLock.unlock("put(FlowFileRecord)");
|
||||
}
|
||||
|
@ -460,7 +465,7 @@ public class SwappablePriorityQueue {
|
|||
activeQueue.addAll(flowFiles);
|
||||
}
|
||||
|
||||
logger.debug("{} put to {}", flowFiles, this);
|
||||
logger.trace("{} put to {}", flowFiles, this);
|
||||
} finally {
|
||||
writeLock.unlock("putAll");
|
||||
}
|
||||
|
@ -475,7 +480,7 @@ public class SwappablePriorityQueue {
|
|||
flowFile = doPoll(expiredRecords, expirationMillis);
|
||||
|
||||
if (flowFile != null) {
|
||||
logger.debug("{} poll() returning {}", this, flowFile);
|
||||
logger.trace("{} poll() returning {}", this, flowFile);
|
||||
incrementUnacknowledgedQueueSize(1, flowFile.getSize());
|
||||
}
|
||||
|
||||
|
@ -535,7 +540,7 @@ public class SwappablePriorityQueue {
|
|||
}
|
||||
|
||||
if (!records.isEmpty()) {
|
||||
logger.debug("{} poll() returning {}", this, records);
|
||||
logger.trace("{} poll() returning {}", this, records);
|
||||
}
|
||||
|
||||
return records;
|
||||
|
@ -594,7 +599,7 @@ public class SwappablePriorityQueue {
|
|||
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
|
||||
|
||||
if (!selectedFlowFiles.isEmpty()) {
|
||||
logger.debug("{} poll() returning {}", this, selectedFlowFiles);
|
||||
logger.trace("{} poll() returning {}", this, selectedFlowFiles);
|
||||
}
|
||||
|
||||
return selectedFlowFiles;
|
||||
|
@ -816,12 +821,13 @@ public class SwappablePriorityQueue {
|
|||
Long maxId = null;
|
||||
List<ResourceClaim> resourceClaims = new ArrayList<>();
|
||||
final long startNanos = System.nanoTime();
|
||||
int failures = 0;
|
||||
|
||||
writeLock.lock();
|
||||
try {
|
||||
final List<String> swapLocations;
|
||||
final List<String> swapLocationsFromSwapManager;
|
||||
try {
|
||||
swapLocations = swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName);
|
||||
swapLocationsFromSwapManager = swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName);
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getQueueIdentifier());
|
||||
logger.error("", ioe);
|
||||
|
@ -832,7 +838,14 @@ public class SwappablePriorityQueue {
|
|||
return null;
|
||||
}
|
||||
|
||||
logger.debug("Recovered {} Swap Files for {}: {}", swapLocations.size(), flowFileQueue, swapLocations);
|
||||
// If we have a duplicate of any of the swap location that we already know about, we need to filter those out now.
|
||||
// This can happen when, upon startup, we need to swap data out during the swap file recovery. In this case, we do
|
||||
// not want to include such a swap file in those that we recover, because those have already been accounted for when
|
||||
// they were added to the queue, before being swapped out.
|
||||
final Set<String> swapLocations = new LinkedHashSet<>(swapLocationsFromSwapManager);
|
||||
swapLocations.removeAll(this.swapLocations);
|
||||
|
||||
logger.debug("Swap Manager reports {} Swap Files for {}: {}", swapLocations.size(), flowFileQueue, swapLocations);
|
||||
for (final String swapLocation : swapLocations) {
|
||||
try {
|
||||
final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
|
||||
|
@ -848,7 +861,8 @@ public class SwappablePriorityQueue {
|
|||
swapByteCount += queueSize.getByteCount();
|
||||
resourceClaims.addAll(summary.getResourceClaims());
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
|
||||
failures++;
|
||||
logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation);
|
||||
logger.error("", ioe);
|
||||
if (eventReporter != null) {
|
||||
eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
|
||||
|
@ -863,9 +877,11 @@ public class SwappablePriorityQueue {
|
|||
writeLock.unlock("Recover Swap Files");
|
||||
}
|
||||
|
||||
if (!swapLocations.isEmpty()) {
|
||||
if (swapLocations.isEmpty()) {
|
||||
logger.debug("No swap files were recovered for {}", flowFileQueue);
|
||||
} else {
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size(), this, millis);
|
||||
logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size() - failures, this, millis);
|
||||
}
|
||||
|
||||
return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
|
||||
|
@ -943,8 +959,14 @@ public class SwappablePriorityQueue {
|
|||
writeLock.lock();
|
||||
try {
|
||||
putAll(queueContents.getActiveFlowFiles());
|
||||
swapLocations.addAll(queueContents.getSwapLocations());
|
||||
|
||||
final List<String> inheritedSwapLocations = queueContents.getSwapLocations();
|
||||
swapLocations.addAll(inheritedSwapLocations);
|
||||
incrementSwapQueueSize(queueContents.getSwapSize().getObjectCount(), queueContents.getSwapSize().getByteCount(), queueContents.getSwapLocations().size());
|
||||
|
||||
if (!inheritedSwapLocations.isEmpty()) {
|
||||
logger.debug("Inherited the following swap locations: {}", inheritedSwapLocations);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock("inheritQueueContents");
|
||||
}
|
||||
|
@ -985,6 +1007,7 @@ public class SwappablePriorityQueue {
|
|||
updated = updateSize(currentSize, updatedSize);
|
||||
} while (!updated);
|
||||
|
||||
logger.debug("Cleared {} to package FlowFile for rebalance to {}", this, newPartitionName);
|
||||
return new FlowFileQueueContents(activeRecords, updatedSwapLocations, swapSize);
|
||||
} finally {
|
||||
writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
|
||||
|
@ -993,6 +1016,6 @@ public class SwappablePriorityQueue {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SwappablePriorityQueue[queueId=" + flowFileQueue.getIdentifier() + "]";
|
||||
return "SwappablePriorityQueue[queueId=" + flowFileQueue.getIdentifier() + ", partition=" + swapPartitionName + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -744,6 +744,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
putAndGetPartition(flowFile);
|
||||
}
|
||||
|
||||
|
||||
protected QueuePartition putAndGetPartition(final FlowFileRecord flowFile) {
|
||||
final QueuePartition partition;
|
||||
|
||||
|
@ -1160,5 +1161,10 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FlowFileQueue[id=" + getIdentifier() + ", Load Balance Strategy=" + getLoadBalanceStrategy() + ", size=" + size() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
|
|||
}
|
||||
} catch (final Exception e) {
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to communicate with Peer "
|
||||
+ client.getNodeIdentifier() + " while trying to load balance data across the cluster due to " + e.toString());
|
||||
+ client.getNodeIdentifier() + " while trying to load balance data across the cluster due to " + e.toString());
|
||||
logger.error("Failed to communicate with Peer {} while trying to load balance data across the cluster.", client.getNodeIdentifier(), e);
|
||||
}
|
||||
|
||||
|
@ -90,6 +90,9 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
|
|||
logger.trace("Was unable to communicate with any client. Will sleep for 10 milliseconds.");
|
||||
Thread.sleep(10L);
|
||||
}
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to communicate with peer while trying to load balance data across the cluster", e);
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to comunicate with Peer while trying to load balance data across the cluster due to " + e);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
|
@ -102,7 +103,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
private final int numPartitions;
|
||||
private final ScheduledExecutorService checkpointExecutor;
|
||||
|
||||
private final Set<String> swapLocationSuffixes = new HashSet<>(); // guraded by synchronizing on object itself
|
||||
private final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
|
||||
|
||||
// effectively final
|
||||
private WriteAheadRepository<RepositoryRecord> wal;
|
||||
|
@ -288,7 +289,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
@Override
|
||||
public boolean isValidSwapLocationSuffix(final String swapLocationSuffix) {
|
||||
synchronized (swapLocationSuffixes) {
|
||||
return swapLocationSuffixes.contains(swapLocationSuffix);
|
||||
return swapLocationSuffixes.contains(normalizeSwapLocation(swapLocationSuffix));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,8 +371,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
// If we have swapped files in or out, we need to ensure that we update our swapLocationSuffixes.
|
||||
if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
|
||||
synchronized (swapLocationSuffixes) {
|
||||
swapLocationsRemoved.forEach(loc -> swapLocationSuffixes.remove(getLocationSuffix(loc)));
|
||||
swapLocationsAdded.forEach(loc -> swapLocationSuffixes.add(getLocationSuffix(loc)));
|
||||
swapLocationsRemoved.forEach(loc -> swapLocationSuffixes.remove(normalizeSwapLocation(loc)));
|
||||
swapLocationsAdded.forEach(loc -> swapLocationSuffixes.add(normalizeSwapLocation(loc)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -415,19 +416,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
}
|
||||
|
||||
|
||||
protected static String getLocationSuffix(final String swapLocation) {
|
||||
protected static String normalizeSwapLocation(final String swapLocation) {
|
||||
if (swapLocation == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String normalizedPath = swapLocation.replace("\\", "/");
|
||||
final String withoutTrailing = (normalizedPath.endsWith("/") && normalizedPath.length() > 1) ? normalizedPath.substring(0, normalizedPath.length() - 1) : normalizedPath;
|
||||
final int lastIndex = withoutTrailing.lastIndexOf("/");
|
||||
if (lastIndex < 0 || lastIndex >= withoutTrailing.length() - 1) {
|
||||
return withoutTrailing;
|
||||
final String pathRemoved = getLocationSuffix(withoutTrailing);
|
||||
|
||||
final String normalized = StringUtils.substringBefore(pathRemoved, ".");
|
||||
return normalized;
|
||||
}
|
||||
|
||||
private static String getLocationSuffix(final String swapLocation) {
|
||||
final int lastIndex = swapLocation.lastIndexOf("/");
|
||||
if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) {
|
||||
return swapLocation;
|
||||
}
|
||||
|
||||
return withoutTrailing.substring(lastIndex + 1);
|
||||
return swapLocation.substring(lastIndex + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -486,7 +494,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
wal.update(repoRecords, true);
|
||||
|
||||
synchronized (this.swapLocationSuffixes) {
|
||||
this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
|
||||
this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
|
||||
}
|
||||
|
||||
logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
|
||||
|
@ -507,7 +515,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
updateRepository(repoRecords, true);
|
||||
|
||||
synchronized (this.swapLocationSuffixes) {
|
||||
this.swapLocationSuffixes.add(getLocationSuffix(swapLocation));
|
||||
this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
|
||||
}
|
||||
|
||||
logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
|
||||
|
@ -633,7 +641,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
|
||||
final Set<String> recoveredSwapLocations = wal.getRecoveredSwapLocations();
|
||||
synchronized (this.swapLocationSuffixes) {
|
||||
recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(getLocationSuffix(loc)));
|
||||
recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(normalizeSwapLocation(loc)));
|
||||
logger.debug("Recovered {} Swap Files: {}", swapLocationSuffixes.size(), swapLocationSuffixes);
|
||||
}
|
||||
|
||||
|
|
|
@ -89,6 +89,10 @@ public class StandardStateManagerProvider implements StateManagerProvider{
|
|||
return provider;
|
||||
}
|
||||
|
||||
public static synchronized void resetProvider() {
|
||||
provider = null;
|
||||
}
|
||||
|
||||
private static StateProvider createLocalStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager)
|
||||
throws IOException, ConfigParseException {
|
||||
final File configFile = properties.getStateManagementConfigFile();
|
||||
|
@ -420,26 +424,26 @@ public class StandardStateManagerProvider implements StateManagerProvider{
|
|||
try {
|
||||
mgr.clear(Scope.CLUSTER);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to clear clustered state for the component", e);
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to clear clustered state for the component", componentId, e);
|
||||
}
|
||||
|
||||
try {
|
||||
mgr.clear(Scope.LOCAL);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to clear local state for the component", e);
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to clear local state for the component", componentId, e);
|
||||
}
|
||||
|
||||
try {
|
||||
localStateProvider.onComponentRemoved(componentId);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its local state", e);
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its local state", componentId, e);
|
||||
}
|
||||
|
||||
if (clusterStateProvider != null) {
|
||||
try {
|
||||
clusterStateProvider.onComponentRemoved(componentId);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its clustered state", e);
|
||||
logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its clustered state", componentId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -398,17 +398,17 @@ public class TestWriteAheadFlowFileRepository {
|
|||
|
||||
|
||||
@Test
|
||||
public void testGetLocationSuffix() {
|
||||
assertEquals("/", WriteAheadFlowFileRepository.getLocationSuffix("/"));
|
||||
assertEquals("", WriteAheadFlowFileRepository.getLocationSuffix(""));
|
||||
assertEquals(null, WriteAheadFlowFileRepository.getLocationSuffix(null));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/test.txt"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/tmp/test.txt"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("//test.txt"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/other/file/repository/test.txt"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("test.txt/"));
|
||||
assertEquals("test.txt", WriteAheadFlowFileRepository.getLocationSuffix("/path/to/test.txt/"));
|
||||
public void testNormalizeSwapLocation() {
|
||||
assertEquals("/", WriteAheadFlowFileRepository.normalizeSwapLocation("/"));
|
||||
assertEquals("", WriteAheadFlowFileRepository.normalizeSwapLocation(""));
|
||||
assertEquals(null, WriteAheadFlowFileRepository.normalizeSwapLocation(null));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("test.txt"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("/test.txt"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("/tmp/test.txt"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("//test.txt"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/other/file/repository/test.txt"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("test.txt/"));
|
||||
assertEquals("test", WriteAheadFlowFileRepository.normalizeSwapLocation("/path/to/test.txt/"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -20,6 +20,13 @@ import org.apache.nifi.admin.service.AuditService;
|
|||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.bundle.BundleCoordinate;
|
||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
|
||||
import org.apache.nifi.cluster.protocol.StandardDataFlow;
|
||||
import org.apache.nifi.components.state.StateProvider;
|
||||
import org.apache.nifi.components.validation.ValidationStatus;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
|
@ -28,12 +35,13 @@ import org.apache.nifi.controller.ControllerService;
|
|||
import org.apache.nifi.controller.FileSystemSwapManager;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.StandardFlowSynchronizer;
|
||||
import org.apache.nifi.controller.StandardSnippet;
|
||||
import org.apache.nifi.controller.flow.StandardFlowManager;
|
||||
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
|
||||
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
|
||||
import org.apache.nifi.controller.queue.ConnectionEventListener;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
|
||||
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.ContentRepository;
|
||||
import org.apache.nifi.controller.repository.FileSystemRepository;
|
||||
|
@ -47,15 +55,14 @@ import org.apache.nifi.controller.repository.RepositoryContext;
|
|||
import org.apache.nifi.controller.repository.RepositoryStatusReport;
|
||||
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
|
||||
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
|
||||
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
|
||||
import org.apache.nifi.controller.scheduling.SchedulingAgent;
|
||||
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
|
||||
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
|
||||
import org.apache.nifi.controller.serialization.FlowSynchronizer;
|
||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
|
||||
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
|
||||
import org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider;
|
||||
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
|
||||
import org.apache.nifi.controller.status.history.VolatileComponentStatusRepository;
|
||||
|
@ -65,8 +72,15 @@ import org.apache.nifi.events.VolatileBulletinRepository;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.integration.processor.BiConsumerProcessor;
|
||||
import org.apache.nifi.integration.processors.GenerateProcessor;
|
||||
import org.apache.nifi.integration.processors.NopProcessor;
|
||||
import org.apache.nifi.integration.processors.TerminateAll;
|
||||
import org.apache.nifi.integration.processors.TerminateOnce;
|
||||
import org.apache.nifi.logging.LogRepositoryFactory;
|
||||
import org.apache.nifi.nar.ExtensionManager;
|
||||
import org.apache.nifi.nar.SystemBundle;
|
||||
import org.apache.nifi.persistence.FlowConfigurationDAO;
|
||||
import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
|
@ -78,19 +92,28 @@ import org.apache.nifi.provenance.ProvenanceRepository;
|
|||
import org.apache.nifi.provenance.WriteAheadProvenanceRepository;
|
||||
import org.apache.nifi.registry.VariableRegistry;
|
||||
import org.apache.nifi.registry.flow.FlowRegistryClient;
|
||||
import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.util.FileUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -100,38 +123,56 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class FrameworkIntegrationTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(FrameworkIntegrationTest.class);
|
||||
|
||||
//@Rule
|
||||
public Timeout globalTimeout = Timeout.seconds(20);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
|
||||
private ResourceClaimManager resourceClaimManager;
|
||||
private StandardProcessScheduler processScheduler;
|
||||
|
||||
private FlowEngine flowEngine;
|
||||
private FlowController flowController;
|
||||
private FlowRegistryClient flowRegistryClient = null;
|
||||
private FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
|
||||
private ProcessorNode nopProcessor;
|
||||
private ProcessorNode terminateProcessor;
|
||||
private ProcessorNode terminateAllProcessor;
|
||||
private FlowFileQueueFactory flowFileQueueFactory;
|
||||
private FlowFileSwapManager flowFileSwapManager;
|
||||
private DirectInjectionExtensionManager extensionManager;
|
||||
private ProcessGroup rootProcessGroup;
|
||||
private Bundle systemBundle;
|
||||
private ClusterCoordinator clusterCoordinator;
|
||||
private NiFiProperties nifiProperties;
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
StandardStateManagerProvider.resetProvider();
|
||||
|
||||
cleanup();
|
||||
initialize(null);
|
||||
initialize();
|
||||
|
||||
flowController.initializeFlow();
|
||||
createFlow();
|
||||
}
|
||||
|
||||
protected String getNiFiPropertiesFilename() {
|
||||
return "src/test/resources/int-tests/default-nifi.properties";
|
||||
if (isClusteredTest()) {
|
||||
return "src/test/resources/int-tests/clustered-nifi.properties";
|
||||
} else {
|
||||
return "src/test/resources/int-tests/default-nifi.properties";
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, String> getNiFiPropertiesOverrides() {
|
||||
|
@ -142,14 +183,20 @@ public class FrameworkIntegrationTest {
|
|||
// Placeholder for subclasses.
|
||||
}
|
||||
|
||||
protected final void initialize(final QueueProvider queueProvider) throws IOException {
|
||||
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(getNiFiPropertiesFilename(), getNiFiPropertiesOverrides());
|
||||
initialize(nifiProperties, queueProvider);
|
||||
protected final void initialize() throws IOException {
|
||||
final Map<String, String> propertyOverrides = new HashMap<>(getNiFiPropertiesOverrides());
|
||||
if (isClusteredTest()) {
|
||||
propertyOverrides.put(NiFiProperties.CLUSTER_IS_NODE, "true");
|
||||
}
|
||||
|
||||
final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(getNiFiPropertiesFilename(), propertyOverrides);
|
||||
initialize(nifiProperties);
|
||||
}
|
||||
|
||||
protected final void initialize(final NiFiProperties nifiProperties, final QueueProvider queueProvider) throws IOException {
|
||||
protected final void initialize(final NiFiProperties nifiProperties) throws IOException {
|
||||
this.nifiProperties = nifiProperties;
|
||||
|
||||
final FlowFileEventRepository flowFileEventRepository = new RingBufferEventRepository(5);
|
||||
resourceClaimManager = new StandardResourceClaimManager();
|
||||
|
||||
final BulletinRepository bulletinRepo = new VolatileBulletinRepository();
|
||||
flowEngine = new FlowEngine(4, "unit test flow engine");
|
||||
|
@ -161,7 +208,12 @@ public class FrameworkIntegrationTest {
|
|||
extensionManager.injectExtensionType(StateProvider.class, WriteAheadLocalStateProvider.class);
|
||||
extensionManager.injectExtensionType(ComponentStatusRepository.class, VolatileComponentStatusRepository.class);
|
||||
extensionManager.injectExtensionType(FlowFileSwapManager.class, FileSystemSwapManager.class);
|
||||
|
||||
extensionManager.injectExtensionType(Processor.class, BiConsumerProcessor.class);
|
||||
extensionManager.injectExtensionType(Processor.class, GenerateProcessor.class);
|
||||
extensionManager.injectExtensionType(Processor.class, TerminateOnce.class);
|
||||
extensionManager.injectExtensionType(Processor.class, TerminateAll.class);
|
||||
extensionManager.injectExtensionType(Processor.class, NopProcessor.class);
|
||||
|
||||
injectExtensionTypes(extensionManager);
|
||||
systemBundle = SystemBundle.create(nifiProperties);
|
||||
|
@ -171,8 +223,48 @@ public class FrameworkIntegrationTest {
|
|||
final Authorizer authorizer = new AlwaysAuthorizedAuthorizer();
|
||||
final AuditService auditService = new NopAuditService();
|
||||
|
||||
flowController = FlowController.createStandaloneInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
|
||||
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
|
||||
if (isClusteredTest()) {
|
||||
final File zookeeperDir = new File("target/state/zookeeper");
|
||||
final File version2Dir = new File(zookeeperDir, "version-2");
|
||||
|
||||
if (!version2Dir.exists()) {
|
||||
assertTrue(version2Dir.mkdirs());
|
||||
}
|
||||
|
||||
final File[] children = version2Dir.listFiles();
|
||||
if (children != null) {
|
||||
for (final File file : children) {
|
||||
FileUtils.deleteFile(file, true);
|
||||
}
|
||||
}
|
||||
|
||||
clusterCoordinator = Mockito.mock(ClusterCoordinator.class);
|
||||
final HeartbeatMonitor heartbeatMonitor = Mockito.mock(HeartbeatMonitor.class);
|
||||
final NodeProtocolSender protocolSender = Mockito.mock(NodeProtocolSender.class);
|
||||
final LeaderElectionManager leaderElectionManager = new CuratorLeaderElectionManager(2, nifiProperties);
|
||||
|
||||
final NodeIdentifier localNodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8111, "localhost", 8081,
|
||||
"localhost", 8082, "localhost", 8083, 8084, false, Collections.emptySet());
|
||||
final NodeIdentifier node2Id = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 8222, "localhost", 8081,
|
||||
"localhost", 8082, "localhost", 8083, 8084, false, Collections.emptySet());
|
||||
|
||||
final Set<NodeIdentifier> nodeIdentifiers = new HashSet<>();
|
||||
nodeIdentifiers.add(localNodeId);
|
||||
nodeIdentifiers.add(node2Id);
|
||||
Mockito.when(clusterCoordinator.getNodeIdentifiers()).thenReturn(nodeIdentifiers);
|
||||
Mockito.when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeId);
|
||||
|
||||
flowController = FlowController.createClusteredInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, protocolSender, bulletinRepo, clusterCoordinator,
|
||||
heartbeatMonitor, leaderElectionManager, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
|
||||
|
||||
flowController.setClustered(true, UUID.randomUUID().toString());
|
||||
flowController.setNodeId(localNodeId);
|
||||
|
||||
flowController.setConnectionStatus(new NodeConnectionStatus(localNodeId, NodeConnectionState.CONNECTED));
|
||||
} else {
|
||||
flowController = FlowController.createStandaloneInstance(flowFileEventRepository, nifiProperties, authorizer, auditService, encryptor, bulletinRepo,
|
||||
VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY, flowRegistryClient, extensionManager);
|
||||
}
|
||||
|
||||
processScheduler = new StandardProcessScheduler(flowEngine, flowController, encryptor, flowController.getStateManagerProvider(), nifiProperties);
|
||||
|
||||
|
@ -180,49 +272,85 @@ public class FrameworkIntegrationTest {
|
|||
final SchedulingAgent timerDrivenSchedulingAgent = new TimerDrivenSchedulingAgent(flowController, flowEngine, repositoryContextFactory, encryptor, nifiProperties);
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenSchedulingAgent);
|
||||
|
||||
final ControllerServiceProvider controllerServiceProvider = new StandardControllerServiceProvider(flowController, processScheduler, bulletinRepo);
|
||||
flowFileSwapManager = flowController.createSwapManager();
|
||||
resourceClaimManager = flowController.getResourceClaimManager();
|
||||
}
|
||||
|
||||
protected void createFlow() {
|
||||
rootProcessGroup = flowController.getFlowManager().createProcessGroup(UUID.randomUUID().toString());
|
||||
rootProcessGroup.setName("Integration Test");
|
||||
|
||||
((StandardFlowManager) flowController.getFlowManager()).setRootGroup(rootProcessGroup);
|
||||
|
||||
nopProcessor = createProcessorNode((context, session) -> {});
|
||||
nopProcessor = createProcessorNode(NopProcessor.class);
|
||||
terminateProcessor = createProcessorNode(TerminateOnce.class);
|
||||
terminateAllProcessor = createProcessorNode(TerminateAll.class);
|
||||
}
|
||||
|
||||
terminateProcessor = createProcessorNode((context, session) -> {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
protected boolean isClusteredTest() {
|
||||
return false;
|
||||
}
|
||||
|
||||
session.remove(flowFile);
|
||||
});
|
||||
|
||||
terminateAllProcessor = createProcessorNode((context, session) -> {
|
||||
FlowFile flowFile;
|
||||
while ((flowFile = session.get()) != null) {
|
||||
session.remove(flowFile);
|
||||
}
|
||||
});
|
||||
|
||||
flowFileSwapManager = flowController.createSwapManager();
|
||||
flowFileQueueFactory = new FlowFileQueueFactory() {
|
||||
@Override
|
||||
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener connectionEventListener) {
|
||||
return FrameworkIntegrationTest.this.createFlowFileQueue(UUID.randomUUID().toString());
|
||||
}
|
||||
};
|
||||
|
||||
if (queueProvider == null) {
|
||||
flowController.initializeFlow();
|
||||
} else {
|
||||
flowController.initializeFlow(queueProvider);
|
||||
}
|
||||
protected ClusterCoordinator getClusterCoordinator() {
|
||||
return clusterCoordinator;
|
||||
}
|
||||
|
||||
@After
|
||||
public final void shutdown() {
|
||||
logger.info("Shutting down...");
|
||||
|
||||
if (flowController != null) {
|
||||
flowController.shutdown(true);
|
||||
}
|
||||
|
||||
if (flowEngine != null) {
|
||||
flowEngine.shutdownNow();
|
||||
}
|
||||
|
||||
if (processScheduler != null) {
|
||||
processScheduler.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
protected void restart() throws IOException, ExecutionException, InterruptedException {
|
||||
logger.info("Shutting down for restart....");
|
||||
|
||||
// Save Flow to a byte array
|
||||
final FlowConfigurationDAO flowDao = new StandardXMLFlowConfigurationDAO(Paths.get("target/int-tests/flow.xml.gz"), flowController.getEncryptor(), nifiProperties, getExtensionManager());
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
flowDao.save(flowController, baos);
|
||||
final byte[] flowBytes = baos.toByteArray();
|
||||
|
||||
// Shutdown
|
||||
flowController.shutdown(true);
|
||||
flowEngine.shutdownNow();
|
||||
processScheduler.shutdown();
|
||||
StandardStateManagerProvider.resetProvider();
|
||||
|
||||
// Remove all Log Repositories so that we can restart with the same ID's
|
||||
for (final ProcessorNode procNode : rootProcessGroup.getProcessors()) {
|
||||
LogRepositoryFactory.removeRepository(procNode.getIdentifier());
|
||||
}
|
||||
|
||||
// Re-initialize the framework components
|
||||
initialize();
|
||||
|
||||
// Reload the flow
|
||||
final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(flowController.getEncryptor(), nifiProperties, extensionManager);
|
||||
flowController.synchronize(flowSynchronizer, new StandardDataFlow(flowBytes, null, null, Collections.emptySet()));
|
||||
|
||||
// Reload FlowFiles / initialize flow
|
||||
final ProcessGroup newRootGroup = flowController.getFlowManager().getRootGroup();
|
||||
rootProcessGroup = newRootGroup;
|
||||
final QueueProvider queueProvider = new QueueProvider() {
|
||||
@Override
|
||||
public Collection<FlowFileQueue> getAllQueues() {
|
||||
return newRootGroup.findAllConnections().stream()
|
||||
.map(Connection::getFlowFileQueue)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
};
|
||||
|
||||
flowController.initializeFlow(queueProvider);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -325,23 +453,26 @@ public class FrameworkIntegrationTest {
|
|||
return processorNode;
|
||||
}
|
||||
|
||||
protected final void connect(final ProcessorNode source, final ProcessorNode destination, final Relationship relationship) {
|
||||
connect(source, destination, Collections.singleton(relationship));
|
||||
protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Relationship relationship) {
|
||||
return connect(source, destination, Collections.singleton(relationship));
|
||||
}
|
||||
|
||||
protected final void connect(final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
|
||||
protected final Connection connect(final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
|
||||
final String id = UUID.randomUUID().toString();
|
||||
final Connection connection = new StandardConnection.Builder(processScheduler)
|
||||
.source(source)
|
||||
.destination(destination)
|
||||
.relationships(relationships)
|
||||
.id(UUID.randomUUID().toString())
|
||||
.id(id)
|
||||
.clustered(false)
|
||||
.flowFileQueueFactory(flowFileQueueFactory)
|
||||
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener) -> createFlowFileQueue(id))
|
||||
.build();
|
||||
|
||||
source.addConnection(connection);
|
||||
destination.addConnection(connection);
|
||||
rootProcessGroup.addConnection(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
protected final Future<Void> start(final ProcessorNode procNode) {
|
||||
|
@ -380,7 +511,7 @@ public class FrameworkIntegrationTest {
|
|||
return getRepositoryContext().getProvenanceRepository();
|
||||
}
|
||||
|
||||
private RepositoryContext getRepositoryContext() {
|
||||
protected RepositoryContext getRepositoryContext() {
|
||||
return flowController.getRepositoryContextFactory().newProcessContext(nopProcessor, new AtomicLong(0L));
|
||||
}
|
||||
|
||||
|
@ -414,31 +545,28 @@ public class FrameworkIntegrationTest {
|
|||
|
||||
protected void triggerOnce(final ProcessorNode processor) throws ExecutionException, InterruptedException {
|
||||
final String schedulingPeriod = processor.getSchedulingPeriod();
|
||||
try {
|
||||
final FlowFileEvent initialReport = getStatusReport(processor);
|
||||
final int initialInvocations = (initialReport == null) ? 0 : initialReport.getInvocations();
|
||||
final FlowFileEvent initialReport = getStatusReport(processor);
|
||||
final int initialInvocations = (initialReport == null) ? 0 : initialReport.getInvocations();
|
||||
|
||||
processor.setScheduldingPeriod("1 hour");
|
||||
processor.setScheduldingPeriod("1 hour");
|
||||
|
||||
// We will only trigger the Processor to run once per hour. So we need to ensure that
|
||||
// we don't trigger the Processor while it's yielded. So if its yield expiration is in the future,
|
||||
// wait until the yield expires.
|
||||
while (processor.getYieldExpiration() > System.currentTimeMillis()) {
|
||||
Thread.sleep(1L);
|
||||
}
|
||||
|
||||
start(processor).get();
|
||||
|
||||
int totalInvocations = initialInvocations;
|
||||
while (totalInvocations < initialInvocations + 1) {
|
||||
final FlowFileEvent currentReport = getStatusReport(processor);
|
||||
totalInvocations = currentReport == null ? 0 : currentReport.getInvocations();
|
||||
}
|
||||
|
||||
stop(processor).get();
|
||||
} finally {
|
||||
processor.setScheduldingPeriod(schedulingPeriod);
|
||||
// We will only trigger the Processor to run once per hour. So we need to ensure that
|
||||
// we don't trigger the Processor while it's yielded. So if its yield expiration is in the future,
|
||||
// wait until the yield expires.
|
||||
while (processor.getYieldExpiration() > System.currentTimeMillis()) {
|
||||
Thread.sleep(1L);
|
||||
}
|
||||
|
||||
start(processor).get();
|
||||
|
||||
int totalInvocations = initialInvocations;
|
||||
while (totalInvocations < initialInvocations + 1) {
|
||||
final FlowFileEvent currentReport = getStatusReport(processor);
|
||||
totalInvocations = currentReport == null ? 0 : currentReport.getInvocations();
|
||||
}
|
||||
|
||||
stop(processor).get();
|
||||
processor.setScheduldingPeriod(schedulingPeriod);
|
||||
}
|
||||
|
||||
protected FlowFileEvent getStatusReport(final ProcessorNode processor) {
|
||||
|
|
|
@ -75,7 +75,8 @@ public class FlowFileRepositoryLifecycleIT extends FrameworkIntegrationTest {
|
|||
shutdown();
|
||||
|
||||
final FlowFileQueue restoredQueue = createFlowFileQueue(queue.getIdentifier());
|
||||
initialize(() -> Collections.singleton(restoredQueue));
|
||||
initialize();
|
||||
getFlowController().initializeFlow(() -> Collections.singleton(restoredQueue));
|
||||
|
||||
for (int i=0; i < queueSize; i++) {
|
||||
final FlowFileRecord flowFileRecord = restoredQueue.poll(Collections.emptySet());
|
||||
|
|
|
@ -22,12 +22,13 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class BiConsumerProcessor extends AbstractProcessor {
|
||||
private BiConsumer<ProcessContext, ProcessSession> trigger;
|
||||
private Set<Relationship> relationships;
|
||||
private volatile Set<Relationship> relationships;
|
||||
|
||||
public void setTrigger(final BiConsumer<ProcessContext, ProcessSession> trigger) {
|
||||
this.trigger = trigger;
|
||||
|
@ -39,8 +40,13 @@ public class BiConsumerProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
if (relationships == null) {
|
||||
throw new IllegalStateException("Relationships have not been initialized");
|
||||
while (relationships == null) {
|
||||
try {
|
||||
Thread.sleep(1L);
|
||||
} catch (final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Collections.emptySet();
|
||||
}
|
||||
}
|
||||
|
||||
return relationships;
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.processors;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
|
||||
import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR;
|
||||
|
||||
public class GenerateProcessor extends AbstractProcessor {
|
||||
public static final PropertyDescriptor COUNT = new Builder()
|
||||
.name("Count")
|
||||
.displayName("Count")
|
||||
.description("Number of FlowFiles to generate")
|
||||
.required(true)
|
||||
.addValidator(NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("1")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CONTENT_SIZE = new Builder()
|
||||
.name("Content Size")
|
||||
.displayName("Content Size")
|
||||
.description("Size of the FlowFile")
|
||||
.required(true)
|
||||
.addValidator(DATA_SIZE_VALIDATOR)
|
||||
.defaultValue("0 B")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return Arrays.asList(COUNT, CONTENT_SIZE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return Collections.singleton(REL_SUCCESS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
for (int i=0; i < context.getProperty(COUNT).asInteger(); i++) {
|
||||
FlowFile flowFile = session.create();
|
||||
|
||||
final int size = context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final byte[] data = new byte[size];
|
||||
session.write(flowFile, out -> out.write(data));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.processors;
|
||||
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
public class NopProcessor extends AbstractProcessor {
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.processors;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
public class TerminateAll extends AbstractProcessor {
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile;
|
||||
while ((flowFile = session.get()) != null) {
|
||||
session.remove(flowFile);
|
||||
session.adjustCounter("Removed", 1, false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.processors;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
public class TerminateOnce extends AbstractProcessor {
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile != null) {
|
||||
session.remove(flowFile);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.swap;
|
||||
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.ProcessScheduler;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.queue.ConnectionEventListener;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
|
||||
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
|
||||
import org.apache.nifi.controller.queue.QueueDiagnostics;
|
||||
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
|
||||
import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.controller.repository.FlowFileSwapManager;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.integration.FrameworkIntegrationTest;
|
||||
import org.apache.nifi.integration.processors.GenerateProcessor;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
|
||||
public class ClusteredSwapFileIT extends FrameworkIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testSwapOnRestartWithLoadBalancedConnectionDoNotLoadBalanceStrategy() throws ExecutionException, InterruptedException, IOException {
|
||||
final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
|
||||
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));
|
||||
|
||||
Connection connection = connect(generator, getTerminateAllProcessor(), REL_SUCCESS);
|
||||
triggerOnce(generator);
|
||||
|
||||
FlowFileQueue queue = connection.getFlowFileQueue();
|
||||
QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
|
||||
LocalQueuePartitionDiagnostics localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(40_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
// restart nifi
|
||||
restart();
|
||||
|
||||
// get the new Connection with the same ID
|
||||
connection = getRootGroup().getConnection(connection.getIdentifier());
|
||||
queue = connection.getFlowFileQueue();
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
// Ensure we have the correct queue sizes
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(40_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
// Consume all the data
|
||||
for (int i=0; i < 60_000; i++) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
|
||||
assertNotNull(flowFile);
|
||||
|
||||
queue.acknowledge(flowFile);
|
||||
}
|
||||
|
||||
assertNull(queue.poll(Collections.emptySet()));
|
||||
|
||||
// Check queue sizes again
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(0, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(0, queue.size().getObjectCount());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSwapOnRestartWithLoadBalancedConnectionRoundRobinStrategy() throws ExecutionException, InterruptedException, IOException {
|
||||
final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
|
||||
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));
|
||||
|
||||
Connection connection = connect(generator, getTerminateAllProcessor(), REL_SUCCESS);
|
||||
FlowFileQueue queue = connection.getFlowFileQueue();
|
||||
|
||||
queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
|
||||
|
||||
final Set<NodeIdentifier> nodeIdentifiers = getClusterCoordinator().getNodeIdentifiers();
|
||||
((SocketLoadBalancedFlowFileQueue) queue).setNodeIdentifiers(nodeIdentifiers, false);
|
||||
|
||||
triggerOnce(generator);
|
||||
|
||||
QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
|
||||
LocalQueuePartitionDiagnostics localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
RemoteQueuePartitionDiagnostics remotePartitionDiagnostics = diagnostics.getRemoteQueuePartitionDiagnostics().get(0);
|
||||
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(1, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(10_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
|
||||
assertEquals(2, remotePartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(29_000, remotePartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(1_000, remotePartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
// restart nifi
|
||||
restart();
|
||||
|
||||
// get the new Connection with the same ID
|
||||
connection = getRootGroup().getConnection(connection.getIdentifier());
|
||||
queue = connection.getFlowFileQueue();
|
||||
|
||||
// Ensure we have the correct queue sizes
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
while (true) {
|
||||
triggerOnce((ProcessorNode) connection.getDestination());
|
||||
FlowFileRecord polled = queue.poll(Collections.emptySet());
|
||||
if (polled == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
queue.acknowledge(polled);
|
||||
}
|
||||
|
||||
// Check queue sizes again
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
remotePartitionDiagnostics = diagnostics.getRemoteQueuePartitionDiagnostics().get(0);
|
||||
|
||||
assertEquals(0, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
|
||||
final int queueCount = remotePartitionDiagnostics.getActiveQueueSize().getObjectCount() + remotePartitionDiagnostics.getSwapQueueSize().getObjectCount()
|
||||
+ remotePartitionDiagnostics.getUnacknowledgedQueueSize().getObjectCount();
|
||||
|
||||
assertEquals(queueCount, queue.size().getObjectCount());
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 60_000)
|
||||
public void testChangeLoadBalanceStrategyWhileDataSwapped() throws ExecutionException, InterruptedException, IOException {
|
||||
final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
|
||||
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));
|
||||
|
||||
Connection connection = connect(generator, getTerminateAllProcessor(), REL_SUCCESS);
|
||||
triggerOnce(generator);
|
||||
|
||||
FlowFileQueue queue = connection.getFlowFileQueue();
|
||||
QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
|
||||
LocalQueuePartitionDiagnostics localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(40_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
queue.setLoadBalanceStrategy(LoadBalanceStrategy.ROUND_ROBIN, null);
|
||||
|
||||
// Consume all the data
|
||||
int polled = 0;
|
||||
while (polled < 30_000) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
|
||||
if (flowFile != null) {
|
||||
polled++;
|
||||
queue.acknowledge(flowFile);
|
||||
}
|
||||
}
|
||||
|
||||
assertNull(queue.poll(Collections.emptySet()));
|
||||
|
||||
// Check queue sizes again
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(0, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(30_000, queue.size().getObjectCount());
|
||||
|
||||
queue.setLoadBalanceStrategy(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null);
|
||||
|
||||
// Consume all the data
|
||||
polled = 0;
|
||||
while (polled < 30_000) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
|
||||
if (flowFile != null) {
|
||||
polled++;
|
||||
queue.acknowledge(flowFile);
|
||||
}
|
||||
}
|
||||
|
||||
assertNull(queue.poll(Collections.emptySet()));
|
||||
|
||||
// Check queue sizes again
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(0, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(0, queue.size().getObjectCount());
|
||||
}
|
||||
|
||||
|
||||
protected FlowFileQueue createFlowFileQueue(final String uuid) {
|
||||
final ProcessScheduler processScheduler = getFlowController().getProcessScheduler();
|
||||
final ResourceClaimManager resourceClaimManager = getFlowController().getResourceClaimManager();
|
||||
final FlowFileSwapManager swapManager = getFlowController().createSwapManager();
|
||||
|
||||
final AsyncLoadBalanceClientRegistry clientRegistry = Mockito.mock(AsyncLoadBalanceClientRegistry.class);
|
||||
|
||||
return new SocketLoadBalancedFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, processScheduler, getFlowFileRepository(), getProvenanceRepository(),
|
||||
getContentRepository(), resourceClaimManager, getClusterCoordinator(), clientRegistry, swapManager, 20000, EventReporter.NO_OP);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isClusteredTest() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.integration.swap;
|
||||
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
|
||||
import org.apache.nifi.controller.queue.QueueDiagnostics;
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.integration.FrameworkIntegrationTest;
|
||||
import org.apache.nifi.integration.processors.GenerateProcessor;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
|
||||
public class StandaloneSwapFileIT extends FrameworkIntegrationTest {
|
||||
@Test
|
||||
public void testSwapOnRestart() throws ExecutionException, InterruptedException, IOException {
|
||||
Thread.sleep(20000L);
|
||||
|
||||
final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
|
||||
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));
|
||||
|
||||
Connection connection = connect(generator, getTerminateAllProcessor(), REL_SUCCESS);
|
||||
triggerOnce(generator);
|
||||
|
||||
FlowFileQueue queue = connection.getFlowFileQueue();
|
||||
QueueDiagnostics diagnostics = queue.getQueueDiagnostics();
|
||||
LocalQueuePartitionDiagnostics localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(40_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
// restart nifi
|
||||
restart();
|
||||
|
||||
// get the new Connection with the same ID
|
||||
connection = getRootGroup().getConnection(connection.getIdentifier());
|
||||
queue = connection.getFlowFileQueue();
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
// Ensure we have the correct queue sizes
|
||||
assertEquals(20_000, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(4, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(40_000, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(60_000, queue.size().getObjectCount());
|
||||
|
||||
// Consume all the data
|
||||
for (int i=0; i < 60_000; i++) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet());
|
||||
assertNotNull(flowFile);
|
||||
|
||||
queue.acknowledge(flowFile);
|
||||
}
|
||||
|
||||
assertNull(queue.poll(Collections.emptySet()));
|
||||
|
||||
// Check queue sizes again
|
||||
diagnostics = queue.getQueueDiagnostics();
|
||||
localPartitionDiagnostics = diagnostics.getLocalQueuePartitionDiagnostics();
|
||||
|
||||
assertEquals(0, localPartitionDiagnostics.getActiveQueueSize().getObjectCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapFileCount());
|
||||
assertEquals(0, localPartitionDiagnostics.getSwapQueueSize().getObjectCount());
|
||||
assertEquals(0, queue.size().getObjectCount());
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,256 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Core Properties #
|
||||
nifi.flow.configuration.file=./target/conf/flow.xml.gz
|
||||
nifi.flow.configuration.archive.enabled=true
|
||||
nifi.flow.configuration.archive.dir=./target/conf/archive/
|
||||
nifi.flow.configuration.archive.max.time=30 days
|
||||
nifi.flow.configuration.archive.max.storage=500 MB
|
||||
nifi.flow.configuration.archive.max.count=
|
||||
nifi.flowcontroller.autoResumeState=true
|
||||
nifi.flowcontroller.graceful.shutdown.period=10 sec
|
||||
nifi.flowservice.writedelay.interval=500 ms
|
||||
nifi.administrative.yield.duration=100 millis
|
||||
# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
|
||||
nifi.bored.yield.duration=10 millis
|
||||
nifi.queue.backpressure.count=10000
|
||||
nifi.queue.backpressure.size=1 GB
|
||||
|
||||
nifi.authorizer.configuration.file=./target/conf/authorizers.xml
|
||||
nifi.login.identity.provider.configuration.file=./target/conf/login-identity-providers.xml
|
||||
nifi.templates.directory=./target/conf/templates
|
||||
nifi.ui.banner.text=
|
||||
nifi.ui.autorefresh.interval=30 sec
|
||||
nifi.nar.library.directory=./target/lib
|
||||
nifi.nar.library.autoload.directory=./target/extensions
|
||||
nifi.nar.working.directory=./target/work/nar/
|
||||
nifi.documentation.working.directory=./target/work/docs/components
|
||||
|
||||
####################
|
||||
# State Management #
|
||||
####################
|
||||
nifi.state.management.configuration.file=src/test/resources/int-tests/state-management.xml
|
||||
# The ID of the local state provider
|
||||
nifi.state.management.provider.local=local-provider
|
||||
# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
|
||||
nifi.state.management.provider.cluster=zk-provider
|
||||
# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
|
||||
nifi.state.management.embedded.zookeeper.start=true
|
||||
# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
|
||||
nifi.state.management.embedded.zookeeper.properties=src/test/resources/int-tests/zookeeper.properties
|
||||
|
||||
|
||||
# H2 Settings
|
||||
nifi.database.directory=./target/database_repository
|
||||
nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
|
||||
|
||||
# FlowFile Repository
|
||||
nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository
|
||||
nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.SequentialAccessWriteAheadLog
|
||||
nifi.flowfile.repository.directory=./target/int-tests/flowfile_repository
|
||||
nifi.flowfile.repository.partitions=256
|
||||
nifi.flowfile.repository.checkpoint.interval=5 mins
|
||||
nifi.flowfile.repository.always.sync=false
|
||||
|
||||
nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager
|
||||
nifi.queue.swap.threshold=20000
|
||||
nifi.swap.in.period=5 sec
|
||||
nifi.swap.in.threads=1
|
||||
nifi.swap.out.period=5 sec
|
||||
nifi.swap.out.threads=4
|
||||
|
||||
# Content Repository
|
||||
nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository
|
||||
nifi.content.claim.max.appendable.size=1 MB
|
||||
nifi.content.claim.max.flow.files=100
|
||||
nifi.content.repository.directory.default=./target/int-tests/content_repository
|
||||
nifi.content.repository.archive.max.retention.period=12 hours
|
||||
nifi.content.repository.archive.max.usage.percentage=50%
|
||||
nifi.content.repository.archive.enabled=true
|
||||
nifi.content.repository.always.sync=false
|
||||
nifi.content.viewer.url=../nifi-content-viewer/
|
||||
|
||||
# Provenance Repository Properties
|
||||
nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
|
||||
nifi.provenance.repository.debug.frequency=1_000_000
|
||||
nifi.provenance.repository.encryption.key.provider.implementation=
|
||||
nifi.provenance.repository.encryption.key.provider.location=
|
||||
nifi.provenance.repository.encryption.key.id=
|
||||
nifi.provenance.repository.encryption.key=
|
||||
|
||||
# Persistent Provenance Repository Properties
|
||||
nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository
|
||||
nifi.provenance.repository.max.storage.time=24 hours
|
||||
nifi.provenance.repository.max.storage.size=1 GB
|
||||
nifi.provenance.repository.rollover.time=30 secs
|
||||
nifi.provenance.repository.rollover.size=100 MB
|
||||
nifi.provenance.repository.query.threads=2
|
||||
nifi.provenance.repository.index.threads=2
|
||||
nifi.provenance.repository.compress.on.rollover=true
|
||||
nifi.provenance.repository.always.sync=false
|
||||
# Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are:
|
||||
# EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details
|
||||
nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID, Relationship
|
||||
# FlowFile Attributes that should be indexed and made searchable. Some examples to consider are filename, uuid, mime.type
|
||||
nifi.provenance.repository.indexed.attributes=
|
||||
# Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
|
||||
# but should provide better performance
|
||||
nifi.provenance.repository.index.shard.size=500 MB
|
||||
# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
|
||||
# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
|
||||
nifi.provenance.repository.max.attribute.length=65536
|
||||
nifi.provenance.repository.concurrent.merge.threads=2
|
||||
|
||||
|
||||
# Volatile Provenance Respository Properties
|
||||
nifi.provenance.repository.buffer.size=100000
|
||||
|
||||
# Component Status Repository
|
||||
nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository
|
||||
nifi.components.status.repository.buffer.size=1440
|
||||
nifi.components.status.snapshot.frequency=1 min
|
||||
|
||||
# Site to Site properties
|
||||
nifi.remote.input.host=
|
||||
nifi.remote.input.secure=false
|
||||
nifi.remote.input.socket.port=
|
||||
nifi.remote.input.http.enabled=true
|
||||
nifi.remote.input.http.transaction.ttl=30 sec
|
||||
nifi.remote.contents.cache.expiration=30 secs
|
||||
|
||||
# web properties #
|
||||
nifi.web.war.directory=./target/lib
|
||||
nifi.web.http.host=
|
||||
nifi.web.http.port=8080
|
||||
nifi.web.http.network.interface.default=
|
||||
nifi.web.https.host=
|
||||
nifi.web.https.port=
|
||||
nifi.web.https.network.interface.default=
|
||||
nifi.web.jetty.working.directory=./target/work/jetty
|
||||
nifi.web.jetty.threads=200
|
||||
nifi.web.max.header.size=16 KB
|
||||
nifi.web.proxy.context.path=
|
||||
nifi.web.proxy.host=
|
||||
|
||||
# security properties #
|
||||
nifi.sensitive.props.key=
|
||||
nifi.sensitive.props.key.protected=
|
||||
nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
|
||||
nifi.sensitive.props.provider=BC
|
||||
nifi.sensitive.props.additional.keys=
|
||||
|
||||
nifi.security.keystore=
|
||||
nifi.security.keystoreType=
|
||||
nifi.security.keystorePasswd=
|
||||
nifi.security.keyPasswd=
|
||||
nifi.security.truststore=
|
||||
nifi.security.truststoreType=
|
||||
nifi.security.truststorePasswd=
|
||||
nifi.security.user.authorizer=managed-authorizer
|
||||
nifi.security.user.login.identity.provider=
|
||||
nifi.security.ocsp.responder.url=
|
||||
nifi.security.ocsp.responder.certificate=
|
||||
|
||||
# OpenId Connect SSO Properties #
|
||||
nifi.security.user.oidc.discovery.url=
|
||||
nifi.security.user.oidc.connect.timeout=5 secs
|
||||
nifi.security.user.oidc.read.timeout=5 secs
|
||||
nifi.security.user.oidc.client.id=
|
||||
nifi.security.user.oidc.client.secret=
|
||||
nifi.security.user.oidc.preferred.jwsalgorithm=
|
||||
|
||||
# Apache Knox SSO Properties #
|
||||
nifi.security.user.knox.url=
|
||||
nifi.security.user.knox.publicKey=
|
||||
nifi.security.user.knox.cookieName=hadoop-jwt
|
||||
nifi.security.user.knox.audiences=
|
||||
|
||||
# Identity Mapping Properties #
|
||||
# These properties allow normalizing user identities such that identities coming from different identity providers
|
||||
# (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing
|
||||
# DNs from certificates and principals from Kerberos into a common identity string:
|
||||
#
|
||||
# nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$
|
||||
# nifi.security.identity.mapping.value.dn=$1@$2
|
||||
# nifi.security.identity.mapping.transform.dn=NONE
|
||||
# nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$
|
||||
# nifi.security.identity.mapping.value.kerb=$1@$2
|
||||
# nifi.security.identity.mapping.transform.kerb=UPPER
|
||||
|
||||
# Group Mapping Properties #
|
||||
# These properties allow normalizing group names coming from external sources like LDAP. The following example
|
||||
# lowercases any group name.
|
||||
#
|
||||
# nifi.security.group.mapping.pattern.anygroup=^(.*)$
|
||||
# nifi.security.group.mapping.value.anygroup=$1
|
||||
# nifi.security.group.mapping.transform.anygroup=LOWER
|
||||
|
||||
# cluster common properties (all nodes must have same values) #
|
||||
nifi.cluster.protocol.heartbeat.interval=5 sec
|
||||
nifi.cluster.protocol.is.secure=false
|
||||
|
||||
# cluster node properties (only configure for cluster nodes) #
|
||||
nifi.cluster.is.node=true
|
||||
nifi.cluster.node.address=localhost
|
||||
nifi.cluster.node.protocol.port=0
|
||||
nifi.cluster.node.protocol.threads=10
|
||||
nifi.cluster.node.protocol.max.threads=50
|
||||
nifi.cluster.node.event.history.size=25
|
||||
nifi.cluster.node.connection.timeout=5 sec
|
||||
nifi.cluster.node.read.timeout=5 sec
|
||||
nifi.cluster.node.max.concurrent.requests=100
|
||||
nifi.cluster.firewall.file=
|
||||
nifi.cluster.flow.election.max.wait.time=5 mins
|
||||
nifi.cluster.flow.election.max.candidates=
|
||||
|
||||
# cluster load balancing properties #
|
||||
nifi.cluster.load.balance.host=
|
||||
nifi.cluster.load.balance.port=6342
|
||||
nifi.cluster.load.balance.connections.per.node=4
|
||||
nifi.cluster.load.balance.max.thread.count=8
|
||||
nifi.cluster.load.balance.comms.timeout=30 sec
|
||||
|
||||
# zookeeper properties, used for cluster management #
|
||||
nifi.zookeeper.connect.string=localhost:62181
|
||||
nifi.zookeeper.connect.timeout=3 secs
|
||||
nifi.zookeeper.session.timeout=3 secs
|
||||
nifi.zookeeper.root.node=/nifi-integration-tests
|
||||
|
||||
# Zookeeper properties for the authentication scheme used when creating acls on znodes used for cluster management
|
||||
# Values supported for nifi.zookeeper.auth.type are "default", which will apply world/anyone rights on znodes
|
||||
# and "sasl" which will give rights to the sasl/kerberos identity used to authenticate the nifi node
|
||||
# The identity is determined using the value in nifi.kerberos.service.principal and the removeHostFromPrincipal
|
||||
# and removeRealmFromPrincipal values (which should align with the kerberos.removeHostFromPrincipal and kerberos.removeRealmFromPrincipal
|
||||
# values configured on the zookeeper server).
|
||||
nifi.zookeeper.auth.type=
|
||||
nifi.zookeeper.kerberos.removeHostFromPrincipal=
|
||||
nifi.zookeeper.kerberos.removeRealmFromPrincipal=
|
||||
|
||||
# kerberos #
|
||||
nifi.kerberos.krb5.file=
|
||||
|
||||
# kerberos service principal #
|
||||
nifi.kerberos.service.principal=
|
||||
nifi.kerberos.service.keytab.location=
|
||||
|
||||
# kerberos spnego principal #
|
||||
nifi.kerberos.spnego.principal=
|
||||
nifi.kerberos.spnego.keytab.location=
|
||||
nifi.kerberos.spnego.authentication.expiration=12 hours
|
||||
|
||||
# external properties files for variable registry
|
||||
# supports a comma delimited list of file locations
|
||||
nifi.variable.registry.properties=
|
|
@ -21,4 +21,12 @@
|
|||
<property name="Partitions">16</property>
|
||||
<property name="Checkpoint Interval">2 mins</property>
|
||||
</local-provider>
|
||||
<cluster-provider>
|
||||
<id>zk-provider</id>
|
||||
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
|
||||
<property name="Connect String">localhost:62181</property>
|
||||
<property name="Root Node">/nifi-integration-test</property>
|
||||
<property name="Session Timeout">30 seconds</property>
|
||||
<property name="Access Control">Open</property>
|
||||
</cluster-provider>
|
||||
</stateManagement>
|
|
@ -0,0 +1,45 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
clientPort=62181
|
||||
initLimit=10
|
||||
autopurge.purgeInterval=24
|
||||
syncLimit=5
|
||||
tickTime=2000
|
||||
dataDir=target/state/zookeeper
|
||||
autopurge.snapRetainCount=30
|
||||
|
||||
#
|
||||
# Specifies the servers that are part of this zookeeper ensemble. For
|
||||
# every NiFi instance running an embedded zookeeper, there needs to be
|
||||
# a server entry below. For instance:
|
||||
#
|
||||
# server.1=nifi-node1-hostname:2888:3888
|
||||
# server.2=nifi-node2-hostname:2888:3888
|
||||
# server.3=nifi-node3-hostname:2888:3888
|
||||
#
|
||||
# The index of the server corresponds to the myid file that gets created
|
||||
# in the dataDir of each node running an embedded zookeeper. See the
|
||||
# administration guide for more details.
|
||||
#
|
||||
|
||||
server.1=localhost:5777:6777
|
Loading…
Reference in New Issue