mirror of https://github.com/apache/nifi.git
NIFI-7706, NIFI-5702: Allow NiFi to keep FlowFiles if their queue is unknown. This way, if a Flow is inadvertently removed, updated, etc., and NiFi is restarted, the data will not be dropped by default. The old mechanism of dropping data is exposed via a property
This closes #4454. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
6596fb1f87
commit
eca7f153d0
|
@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -178,4 +179,13 @@ public interface FlowFileRepository extends Closeable {
|
|||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the set of Resource Claims that are referenced by FlowFiles that have been "orphaned" because they belong to FlowFile Queues/Connections
|
||||
* that did not exist in the flow when NiFi started
|
||||
* @return the set of orphaned Resource Claims
|
||||
*/
|
||||
default Set<ResourceClaim> findOrphanedResourceClaims() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,6 +327,11 @@ public interface FlowManager {
|
|||
|
||||
ParameterContextManager getParameterContextManager();
|
||||
|
||||
/**
|
||||
* @return the number of each type of component (Processor, Controller Service, Process Group, Funnel, Input Port, Output Port, Reporting Task, Remote Process Group)
|
||||
*/
|
||||
Map<String, Integer> getComponentCounts();
|
||||
|
||||
/**
|
||||
* Purges all components from the flow, including:
|
||||
*
|
||||
|
|
|
@ -1438,7 +1438,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
}
|
||||
|
||||
flowSynchronized.set(true);
|
||||
LOG.info("Successfully synchronized controller with proposed flow");
|
||||
LOG.info("Successfully synchronized controller with proposed flow. Flow contains the following number of components: {}", flowManager.getComponentCounts());
|
||||
} finally {
|
||||
writeLock.unlock("synchronize");
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ import java.net.URL;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -218,6 +219,10 @@ public class StandardFlowManager implements FlowManager {
|
|||
}
|
||||
|
||||
public void setRootGroup(final ProcessGroup rootGroup) {
|
||||
if (this.rootGroup != null && this.rootGroup.isEmpty()) {
|
||||
allProcessGroups.remove(this.rootGroup.getIdentifier());
|
||||
}
|
||||
|
||||
this.rootGroup = rootGroup;
|
||||
allProcessGroups.put(ROOT_GROUP_ID_ALIAS, rootGroup);
|
||||
allProcessGroups.put(rootGroup.getIdentifier(), rootGroup);
|
||||
|
@ -738,6 +743,43 @@ public class StandardFlowManager implements FlowManager {
|
|||
return parameterContextManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Integer> getComponentCounts() {
|
||||
final Map<String, Integer> componentCounts = new LinkedHashMap<>();
|
||||
componentCounts.put("Processors", allProcessors.size());
|
||||
componentCounts.put("Controller Services", getAllControllerServices().size());
|
||||
componentCounts.put("Reporting Tasks", getAllReportingTasks().size());
|
||||
componentCounts.put("Process Groups", allProcessGroups.size() - 2); // -2 to account for the root group because we don't want it in our counts and the 'root group alias' key.
|
||||
componentCounts.put("Remote Process Groups", getRootGroup().findAllRemoteProcessGroups().size());
|
||||
|
||||
int localInputPorts = 0;
|
||||
int publicInputPorts = 0;
|
||||
for (final Port port : allInputPorts.values()) {
|
||||
if (port instanceof PublicPort) {
|
||||
publicInputPorts++;
|
||||
} else {
|
||||
localInputPorts++;
|
||||
}
|
||||
}
|
||||
|
||||
int localOutputPorts = 0;
|
||||
int publicOutputPorts = 0;
|
||||
for (final Port port : allOutputPorts.values()) {
|
||||
if (port instanceof PublicPort) {
|
||||
localOutputPorts++;
|
||||
} else {
|
||||
publicOutputPorts++;
|
||||
}
|
||||
}
|
||||
|
||||
componentCounts.put("Local Input Ports", localInputPorts);
|
||||
componentCounts.put("Local Output Ports", localOutputPorts);
|
||||
componentCounts.put("Public Input Ports", publicInputPorts);
|
||||
componentCounts.put("Public Output Ports", publicOutputPorts);
|
||||
|
||||
return componentCounts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParameterContext createParameterContext(final String id, final String name, final Map<String, Parameter> parameters) {
|
||||
final boolean namingConflict = parameterContextManager.getParameterContexts().stream()
|
||||
|
|
|
@ -16,6 +16,22 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
|
||||
import org.apache.nifi.wali.SnapshotCapture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.wali.MinimalLockingWriteAheadLog;
|
||||
import org.wali.SyncListener;
|
||||
import org.wali.WriteAheadRepository;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
|
@ -42,21 +58,6 @@ import java.util.concurrent.ScheduledFuture;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
|
||||
import org.apache.nifi.wali.SnapshotCapture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.wali.MinimalLockingWriteAheadLog;
|
||||
import org.wali.SyncListener;
|
||||
import org.wali.WriteAheadRepository;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -84,6 +85,7 @@ import org.wali.WriteAheadRepository;
|
|||
public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
|
||||
static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
|
||||
private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
|
||||
private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
|
||||
|
||||
static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
|
||||
static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
|
||||
|
@ -95,6 +97,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
|
||||
final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
|
||||
private final boolean alwaysSync;
|
||||
private final boolean retainOrphanedFlowFiles;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
|
||||
volatile ScheduledFuture<?> checkpointFuture;
|
||||
|
@ -105,6 +108,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
final ScheduledExecutorService checkpointExecutor;
|
||||
|
||||
private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
|
||||
private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
private final Set<String> swapLocationSuffixes = new HashSet<>(); // guarded by synchronizing on object itself
|
||||
|
||||
|
@ -145,12 +149,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
checkpointExecutor = null;
|
||||
walImplementation = null;
|
||||
nifiProperties = null;
|
||||
retainOrphanedFlowFiles = true;
|
||||
}
|
||||
|
||||
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
|
||||
alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false"));
|
||||
this.nifiProperties = nifiProperties;
|
||||
|
||||
final String orphanedFlowFileProperty = nifiProperties.getProperty(RETAIN_ORPHANED_FLOWFILES);
|
||||
retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
|
||||
|
||||
// determine the database file path and ensure it exists
|
||||
String writeAheadLogImpl = nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
|
||||
if (writeAheadLogImpl == null) {
|
||||
|
@ -865,6 +873,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
queueMap.put(queue.getIdentifier(), queue);
|
||||
}
|
||||
|
||||
final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
|
||||
int numFlowFilesMissingQueue = 0;
|
||||
long maxId = 0;
|
||||
for (final SerializedRepositoryRecord record : recordList) {
|
||||
|
@ -876,23 +885,52 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
final String queueId = record.getQueueIdentifier();
|
||||
if (queueId == null) {
|
||||
numFlowFilesMissingQueue++;
|
||||
logger.warn("Encounted Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
|
||||
logger.warn("Encountered Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", recordId);
|
||||
|
||||
// Add a drop record so that the record is not retained
|
||||
dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
|
||||
.flowFileRecord(record.getFlowFileRecord())
|
||||
.swapLocation(record.getSwapLocation())
|
||||
.type(RepositoryRecordType.DELETE)
|
||||
.build());
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
final ContentClaim claim = record.getContentClaim();
|
||||
final FlowFileQueue flowFileQueue = queueMap.get(queueId);
|
||||
if (flowFileQueue == null) {
|
||||
final boolean orphaned = flowFileQueue == null;
|
||||
if (orphaned) {
|
||||
numFlowFilesMissingQueue++;
|
||||
logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. Dropping this FlowFile", recordId, queueId);
|
||||
|
||||
if (isRetainOrphanedFlowFiles()) {
|
||||
if (claim == null) {
|
||||
logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any "
|
||||
+ "FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored.", recordId, queueId);
|
||||
} else {
|
||||
claimManager.incrementClaimantCount(claim.getResourceClaim());
|
||||
orphanedResourceClaims.add(claim.getResourceClaim());
|
||||
logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. "
|
||||
+ "This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in "
|
||||
+ "case the flow containing this queue is later restored. This may result in the following Content Claim not being cleaned "
|
||||
+ "up by the Content Repository: {}", recordId, queueId, claim);
|
||||
}
|
||||
} else {
|
||||
dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder()
|
||||
.flowFileRecord(record.getFlowFileRecord())
|
||||
.swapLocation(record.getSwapLocation())
|
||||
.type(RepositoryRecordType.DELETE)
|
||||
.build());
|
||||
|
||||
logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will be dropped.", recordId, queueId);
|
||||
}
|
||||
|
||||
continue;
|
||||
} else if (claim != null) {
|
||||
claimManager.incrementClaimantCount(claim.getResourceClaim());
|
||||
}
|
||||
|
||||
flowFileQueue.put(record.getFlowFileRecord());
|
||||
|
||||
final ContentClaim claim = record.getContentClaim();
|
||||
if (claim != null) {
|
||||
claimManager.incrementClaimantCount(claim.getResourceClaim());
|
||||
}
|
||||
}
|
||||
|
||||
// If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected.
|
||||
|
@ -903,7 +941,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
flowFileSequenceGenerator.set(maxId + 1);
|
||||
logger.info("Successfully restored {} FlowFiles and {} Swap Files", recordList.size() - numFlowFilesMissingQueue, recoveredSwapLocations.size());
|
||||
if (numFlowFilesMissingQueue > 0) {
|
||||
logger.warn("On recovery, found {} FlowFiles whose queue no longer exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
|
||||
logger.warn("On recovery, found {} FlowFiles whose queues no longer exists.", numFlowFilesMissingQueue);
|
||||
}
|
||||
|
||||
if (dropRecords.isEmpty()) {
|
||||
logger.debug("No Drop Records to update Repository with");
|
||||
} else {
|
||||
final long updateStart = System.nanoTime();
|
||||
wal.update(dropRecords, true);
|
||||
final long updateEnd = System.nanoTime();
|
||||
final long updateMillis = TimeUnit.MILLISECONDS.convert(updateEnd - updateStart, TimeUnit.NANOSECONDS);
|
||||
logger.info("Successfully updated FlowFile Repository with {} Drop Records due to missing queues in {} milliseconds", dropRecords.size(), updateMillis);
|
||||
}
|
||||
|
||||
final Runnable checkpointRunnable = new Runnable() {
|
||||
|
@ -927,6 +975,15 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
|
|||
return maxId;
|
||||
}
|
||||
|
||||
private boolean isRetainOrphanedFlowFiles() {
|
||||
return retainOrphanedFlowFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ResourceClaim> findOrphanedResourceClaims() {
|
||||
return Collections.unmodifiableSet(orphanedResourceClaims);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMaxFlowFileIdentifier(final long maxId) {
|
||||
while (true) {
|
||||
|
|
|
@ -86,6 +86,19 @@ public class ContentRepositoryScanTask implements DiagnosticTask {
|
|||
}
|
||||
}
|
||||
|
||||
details.add(""); // Insert empty detail lines to make output more readable.
|
||||
|
||||
final Set<ResourceClaim> orphanedResourceClaims = flowFileRepository.findOrphanedResourceClaims();
|
||||
if (orphanedResourceClaims == null || orphanedResourceClaims.isEmpty()) {
|
||||
details.add("No Resource Claims were referenced by orphaned FlowFiles.");
|
||||
} else {
|
||||
details.add("The following Resource Claims were referenced by orphaned FlowFiles (FlowFiles that exist in the FlowFile Repository but for which the FlowFile's connection/queue" +
|
||||
" did not exist when NiFi started):");
|
||||
|
||||
for (final ResourceClaim claim : orphanedResourceClaims) {
|
||||
details.add(claim.toString());
|
||||
}
|
||||
}
|
||||
|
||||
return new StandardDiagnosticsDumpElement("Content Repository Scan", details);
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
<nifi.flowfile.repository.encryption.key.provider.location />
|
||||
<nifi.flowfile.repository.encryption.key.id />
|
||||
<nifi.flowfile.repository.encryption.key />
|
||||
<nifi.flowfile.repository.retain.orphaned.flowfiles>true</nifi.flowfile.repository.retain.orphaned.flowfiles>
|
||||
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
|
||||
<nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ nifi.flowfile.repository.encryption.key.provider.implementation=${nifi.flowfile.
|
|||
nifi.flowfile.repository.encryption.key.provider.location=${nifi.flowfile.repository.encryption.key.provider.location}
|
||||
nifi.flowfile.repository.encryption.key.id=${nifi.flowfile.repository.encryption.key.id}
|
||||
nifi.flowfile.repository.encryption.key=${nifi.flowfile.repository.encryption.key}
|
||||
nifi.flowfile.repository.retain.orphaned.flowfiles=${nifi.flowfile.repository.retain.orphaned.flowfiles}
|
||||
|
||||
nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
|
||||
nifi.queue.swap.threshold=${nifi.queue.swap.threshold}
|
||||
|
|
|
@ -167,6 +167,12 @@
|
|||
<version>1.12.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-server-nar</artifactId>
|
||||
<version>1.12.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-provenance-repository-nar</artifactId>
|
||||
|
@ -192,6 +198,7 @@
|
|||
<type>nar</type>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
|
||||
<!-- TODO: remove these once minimum Java version is 11 -->
|
||||
<dependency>
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.tests.system;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
@ -31,9 +32,23 @@ public class AggregateNiFiInstance implements NiFiInstance {
|
|||
|
||||
@Override
|
||||
public void start(boolean waitForCompletion) {
|
||||
final Map<Thread, NiFiInstance> startupThreads = new HashMap<>();
|
||||
|
||||
for (final NiFiInstance instance : instances) {
|
||||
if (instance.isAutoStart()) {
|
||||
instance.start(waitForCompletion);
|
||||
final Thread t = new Thread(() -> instance.start(waitForCompletion));
|
||||
t.start();
|
||||
startupThreads.put(t, instance);
|
||||
}
|
||||
}
|
||||
|
||||
for (final Map.Entry<Thread, NiFiInstance> entry : startupThreads.entrySet()) {
|
||||
final Thread startupThread = entry.getKey();
|
||||
|
||||
try {
|
||||
startupThread.join();
|
||||
} catch (final InterruptedException ie) {
|
||||
throw new RuntimeException("Interrupted while waiting for instance " + entry.getValue() + " to finish starting");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -774,6 +775,23 @@ public class NiFiClientUtil {
|
|||
return flowFileEntity;
|
||||
}
|
||||
|
||||
public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
|
||||
final ListingRequestEntity listing = performQueueListing(connectionId);
|
||||
final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
|
||||
if (flowFileIndex >= flowFileSummaries.size()) {
|
||||
throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
|
||||
}
|
||||
|
||||
final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
|
||||
final String uuid = flowFileSummary.getUuid();
|
||||
final String nodeId = flowFileSummary.getClusterNodeId();
|
||||
|
||||
final FlowFileEntity flowFileEntity = nifiClient.getConnectionClient().getFlowFile(connectionId, uuid, nodeId);
|
||||
flowFileEntity.getFlowFile().setClusterNodeId(nodeId);
|
||||
|
||||
return nifiClient.getConnectionClient().getFlowFileContent(connectionId, uuid, nodeId);
|
||||
}
|
||||
|
||||
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {
|
||||
final Set<VariableEntity> variableEntities = new HashSet<>();
|
||||
for (final Map.Entry<String, String> entry : variables.entrySet()) {
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.tests.system;
|
||||
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
|
@ -60,15 +59,10 @@ public abstract class NiFiSystemIT {
|
|||
private static final File LIB_DIR = new File("target/nifi-lib-assembly/lib");
|
||||
private static volatile String nifiFrameworkVersion = null;
|
||||
|
||||
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Convenience Relationship for use in tests")
|
||||
.build();
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
@Rule
|
||||
public Timeout defaultTimeout = new Timeout(2, TimeUnit.MINUTES);
|
||||
public Timeout defaultTimeout = new Timeout(5, TimeUnit.MINUTES);
|
||||
|
||||
private NiFiClient nifiClient;
|
||||
private NiFiClientUtil clientUtil;
|
||||
|
|
|
@ -78,6 +78,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
|
|||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "RunNiFiInstance[dir=" + instanceDirectory + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(final boolean waitForCompletion) {
|
||||
if (runNiFi != null) {
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.tests.system.restart;
|
||||
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.tests.system.NiFiInstance;
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.entity.ConnectionEntity;
|
||||
import org.apache.nifi.web.api.entity.ProcessorEntity;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class FlowFileRestorationIT extends NiFiSystemIT {
|
||||
|
||||
@Test
|
||||
public void testDataInMissingQueueRestoredWhenQueueRestored() throws NiFiClientException, IOException, InterruptedException {
|
||||
final ProcessorEntity generator = getClientUtil().createProcessor("GenerateFlowFile");
|
||||
getClientUtil().updateProcessorProperties(generator, Collections.singletonMap("File Size", "1 KB"));
|
||||
getClientUtil().updateProcessorSchedulingPeriod(generator, "100 min");
|
||||
|
||||
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
|
||||
final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
|
||||
|
||||
getNifiClient().getProcessorClient().startProcessor(generator);
|
||||
waitForQueueCount(connection.getId(), 1);
|
||||
getNifiClient().getProcessorClient().stopProcessor(generator);
|
||||
|
||||
final byte[] flowFileContents = getFlowFileContents(connection.getId(), 0);
|
||||
|
||||
assertEquals(1024, flowFileContents.length);
|
||||
|
||||
final NiFiInstance nifiInstance = getNiFiInstance();
|
||||
nifiInstance.stop();
|
||||
|
||||
final File nifiHome = nifiInstance.getInstanceDirectory();
|
||||
final File confDir = new File(nifiHome, "conf");
|
||||
final File flowXmlGz = new File(confDir, "flow.xml.gz");
|
||||
final byte[] flowXmlGzBytes = Files.readAllBytes(flowXmlGz.toPath());
|
||||
assertTrue(flowXmlGz.delete());
|
||||
|
||||
nifiInstance.start();
|
||||
|
||||
try {
|
||||
getNifiClient().getConnectionClient().getConnection(connection.getId());
|
||||
Assert.fail("Didn't expect to retrieve a connection");
|
||||
} catch (final NiFiClientException nfce) {
|
||||
// Expected because the connection no longer exists.
|
||||
}
|
||||
|
||||
// Stop the instance, restore the flow.xml.gz, and restart
|
||||
nifiInstance.stop();
|
||||
Files.write(flowXmlGz.toPath(), flowXmlGzBytes, StandardOpenOption.CREATE);
|
||||
nifiInstance.start();
|
||||
|
||||
// Ensure that there's a FlowFile queued up and that its contents are still accessible and have not changed.
|
||||
final ConnectionEntity retrievedConnection = getNifiClient().getConnectionClient().getConnection(connection.getId());
|
||||
assertNotNull(retrievedConnection);
|
||||
waitForQueueCount(connection.getId(), 1);
|
||||
final byte[] contentsAfterRestart = getFlowFileContents(connection.getId(), 0);
|
||||
|
||||
assertArrayEquals(flowFileContents, contentsAfterRestart);
|
||||
}
|
||||
|
||||
private byte[] getFlowFileContents(final String connectionId, final int flowFileIndex) throws IOException, NiFiClientException {
|
||||
final byte[] flowFileContents;
|
||||
try (final InputStream in = getClientUtil().getFlowFileContent(connectionId, flowFileIndex);
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
|
||||
StreamUtils.copy(in, baos);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
|
|||
|
||||
# JVM memory settings
|
||||
java.arg.2= -Xms128m
|
||||
java.arg.3=-Xmx128m
|
||||
java.arg.3=-Xmx256m
|
||||
|
||||
java.arg.14=-Djava.awt.headless=true
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ graceful.shutdown.seconds=20
|
|||
|
||||
# JVM memory settings
|
||||
java.arg.2= -Xms128m
|
||||
java.arg.3=-Xmx128m
|
||||
java.arg.3=-Xmx256m
|
||||
|
||||
java.arg.14=-Djava.awt.headless=true
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
|
|||
import org.apache.nifi.web.api.entity.ListingRequestEntity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public interface ConnectionClient {
|
||||
ConnectionEntity getConnection(String id) throws NiFiClientException, IOException;
|
||||
|
@ -49,4 +50,6 @@ public interface ConnectionClient {
|
|||
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException;
|
||||
|
||||
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
|
||||
|
||||
InputStream getFlowFileContent(String connectionId, String flowFileUuid, String nodeId) throws NiFiClientException, IOException;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import javax.ws.rs.client.Entity;
|
|||
import javax.ws.rs.client.WebTarget;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -271,4 +272,27 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
|
|||
return getRequestBuilder(target).get(FlowFileEntity.class);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getFlowFileContent(final String connectionId, final String flowFileUuid, final String nodeId) throws NiFiClientException, IOException {
|
||||
if (connectionId == null) {
|
||||
throw new IllegalArgumentException("Connection ID cannot be null");
|
||||
}
|
||||
if (flowFileUuid == null) {
|
||||
throw new IllegalArgumentException("FlowFile UUID cannot be null");
|
||||
}
|
||||
|
||||
return executeAction("Error retrieving FlowFile Content", () -> {
|
||||
WebTarget target = flowFileQueueTarget
|
||||
.path("flowfiles/{uuid}/content")
|
||||
.resolveTemplate("id", connectionId)
|
||||
.resolveTemplate("uuid", flowFileUuid);
|
||||
|
||||
if (nodeId != null) {
|
||||
target = target.queryParam("clusterNodeId", nodeId);
|
||||
}
|
||||
|
||||
return getRequestBuilder(target).get(InputStream.class);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue